In [1]:
import panel as pn
pn.extension('tabulator', sizing_mode='stretch_width', notifications=True)
from panel.io.notifications import NotificationArea
NotificationArea.position = 'bottom-right'

import pyvo as vo
import datetime as dt
from astropy.time import Time
import param
from pypika import Table, Criterion, EmptyCriterion, Order
import pandas as pd
from astroquery.simbad import Simbad
from astropy.coordinates import SkyCoord, Angle
from astropy import units as u
import warnings
from astroquery.exceptions import TableParseError
from astropy.io import fits
from matplotlib.figure import Figure
from matplotlib import cm
import numpy as np
from pathlib import Path

"""
This is still a very basic program and basically the only thing that it does is talk to the Kapteyn archive.
It shows the entries in the database for the specific filters that the user has specified
For now users can specify a date range, what kind of columns they want and whether the right ascension and declination should be null.
They can also search for an object using either a box or cone search.
They can then select entries from the generated table and view that data on another tab.
For now users cannot do anything with that data, but these functions will be added.
I will add the ability to plot, specify more filters and add some basic functions, such as aligning images.
If you have any suggestions on what you would like to see added, please let me know.
Also, if you see any code that could be better, let me know as well.
"""

In [2]:
# Make connection to the TAP service
url = "https://vo.astro.rug.nl/tap"
service = vo.dal.TAPService(url)

In [3]:
"""
To create the queries I use PyPika, which is a query builder
It supports several SQL dialects, but not ADQL, so I added some ADQL functioning
These classes override the existing PyPika classes
With this it should also be relatively easy to add more custom
"""

from pypika.queries import Query, QueryBuilder
from pypika.utils import builder, QueryException
from typing import Any, Optional
from enum import Enum

class Dialects(Enum):
    ADQL = 'adql'

class ADQLQuery(Query):
    @classmethod
    def _builder(cls, **kwargs: Any) -> "QueryBuilder":
        return ADQLQueryBuilder(**kwargs)

class ADQLQueryBuilder(QueryBuilder):
    QUOTE_CHAR = None
    ALIAS_QUOTE_CHAR = '"'
    QUERY_ALIAS_QUOTE_CHAR = ''
    QUERY_CLS = ADQLQuery

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(dialect=Dialects.ADQL, **kwargs)
        self._top = None
        self._contains_point = None
        self._contains_box = None

    @builder
    def top(self, value):
        try:
            self._top = int(value)
        except ValueError:
            raise QueryException('TOP value must be an integer')

    def get_sql(self, *args, **kwargs):
        return super(ADQLQueryBuilder, self).get_sql(*args, groupby_alias=False, **kwargs)

    def _top_sql(self):
        if self._top:
            return 'TOP {} '.format(self._top)
        else:
            return ''

    @builder
    def contains_point(self, ra, dec, from_table, cone_radius):
        self._contains_point = [ra, dec, from_table, cone_radius]

    def _contains_point_sql(self):
        if self._contains_point:
            return "CONTAINS(POINT('ICRS', {}, {}), CIRCLE('ICRS', {}.ra, {}.dec, {}))=1 AND ".format(
                self._contains_point[0], self._contains_point[1], self._contains_point[2], self._contains_point[2], self._contains_point[3])
        else:
            return ''

    @builder
    def contains_box(self, ra, dec, from_table, width, height):
        self._contains_box = [ra, dec, from_table, width, height]

    def _contains_box_sql(self):
        if self._contains_box:
            return "CONTAINS(POINT('ICRS', {}, {}), BOX('ICRS', {}.ra, {}.dec, {}, {}))=1 AND ".format(
                self._contains_box[0], self._contains_box[1], self._contains_box[2], self._contains_box[2], self._contains_box[3], self._contains_box[4])
        else:
            return ''

    def _select_sql(self, **kwargs: Any) -> str:
        return "SELECT {distinct}{top}{select}".format(
            top=self._top_sql(),
            distinct="DISTINCT " if self._distinct else "",
            select=",".join(term.get_sql(with_alias=True, subquery=True, **kwargs) for term in self._selects)
        )

    @builder
    def _where_sql(self, quote_char: Optional[str] = None, **kwargs: Any) -> str:
        return " WHERE {contains_point}{contains_box}{where}".format(
            contains_point=self._contains_point_sql(),
            contains_box = self._contains_box_sql(),
            where=self._wheres.get_sql(quote_char=quote_char, subquery=True, **kwargs)
        )

In [4]:
home_dir = '../Data/Raw/'

options = {'Filename': 'filename',
           'Observation date': 'kw_DATE_OBS',
           'Image type': 'kw_IMAGETYP',
           'Right ascension': 'ra',
           'Declination': 'dec',
           'Object': 'kw_OBJECT',
           'Filter': 'kw_FILTER'}
options_reverse = {'filename': 'Filename',
                   'kw_DATE_OBS': 'Observation date',
                   'kw_IMAGETYP': 'Image type',
                   'ra': 'Right ascension',
                   'dec': 'Declination',
                   'kw_OBJECT': 'Object',
                   'kw_FILTER': 'Filter',
                   'astrom': 'Astrometry'}
key_options = ['Filename', 'Filter', 'Object']
class CompileQuery(param.Parameterized):
    """
    Here all of the widgets needed for the query are generated.
    Users can change the settings and press the search button to query the database with their chosen options.
    """
    begin_date = dt.datetime(2008, 5, 1, 12)
    end_date = dt.datetime(2020, 4, 23, 12)
    dates = param.DateRange((begin_date, end_date), precedence=1)
    dates_order = param.Selector({'Descending': Order.desc, 'Ascending': Order.asc}, precedence=1)
    reset_date_button = param.Action(lambda x: x.param.trigger('reset_date_button'), label='Reset date', precedence=1)
    select_columns = param.ListSelector(default=['Filename',
                                                 'Image type',
                                                 'Filter',
                                                 'Observation date',
                                                 'Right ascension',
                                                 'Declination',
                                                 'Object'],
                                        objects=options.keys(),
                                        precedence=1)
    ra_dec_not_null = param.Selector({'RA & dec specified': True, 'All entries': False}, precedence=1)
    object_or_coordinates = param.Selector(['Object', 'Coordinates'], precedence=1)
    select_object = param.String(default='', precedence=1)
    search_coordinates = param.String(default='' , precedence=-1)
    box_or_cone = param.Selector(['Box', 'Cone'], precedence=1)
    cone_radius = param.Number(default=17, bounds=(1, 35), precedence=-1)
    box_width = param.Number(1700, precedence=1)
    box_height = param.Number(1100, precedence=1)
    nr_entries = param.Number(100, precedence=1)

    @param.depends('box_or_cone', watch=True)
    def box_cone_search(self):
        """
        Whenever the box option is chosen, the cone settings will disappear and vice versa
        """
        if self.box_or_cone == 'Box':
            self.param.cone_radius.precedence = -1
            self.param.box_width.precedence = 1
            self.param.box_height.precedence = 1
        else:
            self.param.cone_radius.precedence = 1
            self.param.box_width.precedence = -1
            self.param.box_height.precedence = -1

    @param.depends('object_or_coordinates', watch=True)
    def object_coordinates_search(self):
        """
        Whenever the object option is chosen, the coordinates settings will disappear and vice versa
        """
        if self.object_or_coordinates == 'Object':
            self.param.select_object.precedence = 1
            self.param.search_coordinates.precedence = -1
        else:
            self.select_object = ''
            self.param.select_object.precedence = -1
            self.param.search_coordinates.precedence = 1

    @param.depends('reset_date_button', watch=True)
    def reset_date(self):
        """
        Reset date to make it easier to search the whole archives
        End date should probably be given using the now function of datetime
        """
        self.dates = (self.begin_date, self.end_date)

    def table_string(self):
        """
        Specifies the table a user wants to search in
        Currently static, because the other tables cannot be searched
        """
        table_name = 'observations.raw'
        return table_name

    def from_query(self):
        """
        Creates the Table object needed for the query
        """
        raw_observations = Table(self.table_string())
        return raw_observations

    @param.depends('select_columns')
    def select_query(self):
        """
        Create a string containing all of the selected columns
        Check if the necessary columns are selected and add them if not
        Those columns will later be hidden
        """
        select_options = ''
        from_table = self.from_query()
        for i in self.select_columns:
            select_options += str(from_table).strip('"') + '.' + options[i] + ','
        for i in key_options:
            if i not in self.select_columns:
                select_options += str(from_table).strip('"') + '.' + options[i] + ','
        return select_options.rstrip(',')

    @param.depends('select_object')
    def find_object(self):
        """
        Check if the user has typed in an object
        Returns an error if the object cannot be found by SIMBAD
        Returns the right ascension and declination if the object can be found
        """
        if self.select_object == '':
            return True
        else:
            object_name = self.select_object
            with warnings.catch_warnings():
                warnings.filterwarnings('error')
                try:
                    simbad_object = Simbad.query_object(object_name)
                    coordinates = SkyCoord(simbad_object['RA'], simbad_object['DEC'], unit=(u.hourangle, u.deg))
                    object_ra, object_dec = coordinates.ra.deg[0], coordinates.dec.deg[0]
                    return object_ra, object_dec
                except (Warning, TableParseError):
                    return False

    @param.depends('search_coordinates')
    def coordinates_to_degrees(self):
        """
        Users can enter right ascension in hours, minutes and seconds
        Users can declination in degrees, minutes and seconds
        Here they are transformed to degrees, which can be used to search the database
        """
        if self.search_coordinates == '':
            return
        right_ascension, declination = self.search_coordinates.split(',')
        if len(right_ascension) > 1:
            if float(right_ascension.split(' ')[0]) >= 24:
                right_ascension = Angle(right_ascension, unit=u.degree)
                right_ascension.wrap_at(360 * u.degree, inplace=True)
            else:
                right_ascension = Angle(right_ascension, unit=u.hourangle)
                right_ascension.wrap_at(24 * u.hourangle, inplace=True)
        else:
            if float(right_ascension) >= 24:
                right_ascension = Angle(right_ascension, unit=u.degree)
                right_ascension.wrap_at(360 * u.degree, inplace=True)
            else:
                right_ascension = Angle(right_ascension, unit=u.hourangle)
                right_ascension.wrap_at(24 * u.hourangle, inplace=True)
        declination = Angle(declination, unit=u.deg)
        right_ascension = right_ascension.deg
        declination = declination.deg
        if declination < -90:
            while declination < -90:
                declination += 90
        elif declination > 90:
            while declination > 90:
                declination -= 90
        return right_ascension, declination

    @param.depends('ra_dec_not_null')
    def not_null_check(self):
        """
        Check if user wants right ascension and declination to be not null
        """
        if self.ra_dec_not_null:
            return True

    @param.depends('dates')
    def date_query(self):
        """
        Create Julian dates from the dates specified by the user
        """
        start_date = Time(self.dates[0]).jd
        end_date = Time(self.dates[1]).jd
        return start_date, end_date

In [5]:
class FetchData(CompileQuery):
    """
    This class fetches all the parts of the query and searches the archive for corresponding entries
    It will also change the notation of the right ascension and declination
    It will show errors when either the object cannot be found or no entries with the specified options can be found
    """
    search_object_button = param.Action(lambda x: x.param.trigger('search_object_button'), label='Search', precedence=5)
    reset_search_button = param.Action(lambda x: x.param.trigger('reset_search_button'), label='Reset search', precedence=5)

    @param.depends('reset_search_button', watch=True)
    def reset_search(self):
        """
        An easy way for the user to reset the options and search the whole archive
        """
        self.dates = (self.begin_date, self.end_date)
        self.ra_dec_not_null = True
        self.select_object = ''
        self.cone_radius = 17
        self.box_width = 1700
        self.box_height = 1100
        self.param.trigger('search_object_button')

    @param.depends('search_object_button')
    def data(self):
        """
        If the specified object could not be found, it will stop immediately
        If the object could be found or no object was specified, it will compile the query
        It takes all the parts from the previous class and creates the query using PyPika
        I made some of my own classes to add some ADQL functionality
        Once the archive has been searched, the entries are converted to a pandas DataFrame for easier manipulation
        It changes the right ascension and declination notation to contains hours/degrees, minutes and seconds
        """
        if not self.find_object():
            return
        observation_table = self.from_query()
        dates = self.date_query()
        query = ADQLQuery.from_(
            observation_table
        ).select(
            self.select_query()
        )
        if self.select_object != '':
            self.ra_dec_not_null = True
            right_ascension, declination = self.find_object()
            if self.box_or_cone == 'Cone':
                query = query.contains_point(right_ascension, declination, self.table_string(), self.cone_radius/60)
            elif self.box_or_cone == 'Box':
                query = query.contains_box(self.find_object()[0], self.find_object()[1], self.table_string(), self.box_width/3600, self.box_height/3600)
        elif self.select_object == '' and self.search_coordinates != '':
            self.ra_dec_not_null = True
            right_ascension, declination = self.coordinates_to_degrees()
            if self.box_or_cone == 'Cone':
                query = query.contains_point(right_ascension, declination, self.table_string(), self.cone_radius/60)
            elif self.box_or_cone == 'Box':
                query = query.contains_box(right_ascension, declination, self.table_string(), self.box_width/3600, self.box_height/3600)
        query = query.where(
            Criterion.all([
                observation_table.ra.isnotnull() if self.not_null_check() else EmptyCriterion(),
                observation_table.dec.isnotnull() if self.not_null_check() else EmptyCriterion(),
                observation_table.obs_jd[dates[0]:dates[1]]
            ])
        ).orderby(observation_table.obs_jd, order=self.dates_order).top(self.nr_entries)
        result = service.search(query.get_sql())
        data_pandas = result.to_table().to_pandas()
        data_pandas['ra'] = data_pandas['ra'].apply(self.ra_to_hms)
        data_pandas['dec'] = data_pandas['dec'].apply(self.dec_to_dms)
        data_pandas['astrom'] = data_pandas['filename'].apply(self.check_astrom)
        first_column = data_pandas.pop('astrom')
        data_pandas.insert(0, 'astrom', first_column)
        return data_pandas

    def check_astrom(self, filename):
        """
        Check if a file is an Astrometry file
        """
        if "astrom" in filename:
            return True
        else:
            return False

    def ra_to_hms(self, nr_angle):
        """
        Changes the right ascension notation from degrees to hours, minutes and seconds
        """
        angle = Angle(nr_angle, u.degree)
        return angle.to_string(unit=u.hour, sep=('h:', 'm:', 's'), precision=3)

    def dec_to_dms(self, nr_angle):
        """
        Changes the declination notation from degrees to degrees, minutes and seconds
        """
        angle = Angle(nr_angle, u.degree)
        return angle.to_string(unit=u.degree, sep=('d:', 'm:', 's'), precision=3)

    @param.depends('search_object_button')
    def display_object_error(self):
        """
        Displays an indefinite error whenever SIMBAD cannot find the specified object
        """
        if not self.find_object():
            message = "SIMBAD could not find the object " + str(self.select_object) + " that you searched for."
            pn.state.notifications.error(message, duration=0)

    @param.depends('search_object_button', 'data')
    def display_data_error(self):
        """
        Displays an indefinite error whenever no entries could be found in the archive with the specified options
        """
        if self.find_object() and self.data().values.size == 0:
            message = "No entries could be found with the options you specified."
            pn.state.notifications.error(message, duration=0)

In [6]:
class CreateTable(FetchData):
    """
    This class creates the responsive tables using the Tabulator widget
    This widget is highly customizable and interactive
    It allows users to select specific entries and these entries are displayed in another table on another tab
    """
    table = param.DataFrame(columns=CompileQuery().select_columns, precedence=-1)
    if FetchData().data() is None:
        table.default = pd.DataFrame(columns=CompileQuery().select_columns)
    else:
        table.default = FetchData().data()
        table.columns = options_reverse

    def __init__(self, **params):
        """
        Create the Tabulator widgets so they are not recreated every time a query is executed (as far as I know)
        """
        super().__init__(**params)
        self.tabulator_formatters = {
            'astrom': {'type': 'tickCross'}
        }
        self.table_widget = pn.widgets.Tabulator(self.table,
                                                 disabled=True,
                                                 pagination='local',
                                                 show_index=False,
                                                 layout='fit_columns',
                                                 selectable='checkbox-single',
                                                 text_align='left',
                                                 min_height=500,
                                                 widths={'astrom': 105},
                                                 titles=options_reverse,
                                                 formatters=self.tabulator_formatters)

    @param.depends('data', watch=True)
    def full_table(self):
        """
        If no data could be found, the Tabulator will be left empty
        Otherwise the data is added to the widget and the correct column names are given
        """
        if self.data() is None:
            self.table_widget.value = pd.DataFrame(columns=self.select_columns)
        else:
            self.table_widget.value = self.data()
            self.table_widget.titles = options_reverse
            columns_to_hide = []
            for i in key_options:
                if i not in self.select_columns:
                    columns_to_hide.append(options[i])
            self.table_widget.hidden_columns = columns_to_hide

    selected_table_widget = pn.widgets.Tabulator(pd.DataFrame(),
                                                 disabled=True,
                                                 pagination='local',
                                                 page_size=8,
                                                 show_index=False,
                                                 layout='fit_data',
                                                 selectable='checkbox',
                                                 text_align='left',
                                                 min_height=300,
                                                 titles=options_reverse)

    @param.depends('table_widget.selection')
    def selected_table(self):
        """
        Checks if entries have been selected from the main table
        If not, the widget will be left empty
        Otherwise the data is added with specific column names (I think only these columns are needed here)
        It also changes the filepath to the filename
        """
        selection = self.table_widget.selected_dataframe
        if selection.values.size == 0:
            self.selected_table_widget.value = pd.DataFrame(columns=['filename', 'kw_FILTER', 'kw_OBJECT'])
        else:
            selection = selection[['filename', 'kw_FILTER', 'kw_OBJECT']]
            selection['filename'] = selection['filename'].apply(self.give_file_name)
            self.selected_table_widget.value = selection
        return self.selected_table_widget

    def give_file_name(self, filepath):
        """
        Given a file path, it will return the filename
        Seems to be the best way to display the files for the selected table
        """
        return Path(filepath).name

In [7]:
class CreatePlots(CreateTable):
    """
    Creates the plots of the files selected by the user
    """
    plot_button = param.Action(lambda x: x.param.trigger('plot_button'), label='Plot', precedence=5)

    def __init__(self, **params):
        super().__init__(**params)
        self.cannot_find_file = False

    def get_file_list(self):
        """
        Gather a list of all files selected by the user
        """
        selected = self.selected_table_widget.selected_dataframe
        if len(selected) == 0 or len(selected) > 3:
            return None
        filename_list = []
        for i in selected['filename']:
            filepath = home_dir + i
            if Path(filepath).is_file():
                filename_list.append(home_dir + i)
            else:
                self.cannot_find_file = True
                return None
        return filename_list

    @param.depends('plot_button')
    def single_plot(self, filename):
        """
        Function to create a single plot based on the filename
        """
        fits_file = fits.open(filename)
        fits_data = fits_file[0].data
        filter_type = fits_file[0].header['FILTER']
        fits_file.close()
        vmin = np.percentile(fits_data, 5)
        vmax = np.percentile(fits_data, 95)
        fig = Figure(figsize=(9, 6))
        ax = fig.subplots()
        short_name = self.give_file_name(filename)
        ax.set_title(short_name + " (filter: " + filter_type + ")")
        img = ax.imshow(fits_data, interpolation='none', origin='lower', cmap=cm.gray, vmin=vmin, vmax=vmax)
        fig.colorbar(img, shrink=0.8)
        img.axes.get_xaxis().set_visible(False)
        img.axes.get_yaxis().set_visible(False)
        return pn.pane.Matplotlib(fig, tight=True, sizing_mode='scale_both')

    @param.depends('plot_button')
    def file_not_found(self):
        """
        Error to display if a file could not be found
        """
        if self.cannot_find_file:
            self.cannot_find_file = False
            message = "One of the files you tried to plot could not be found."
            pn.state.notifications.error(message, duration=0)

    @param.depends('plot_button')
    def plot1(self):
        """
        Plots the first file, if any are present
        """
        if self.get_file_list() is None:
            return pn.Card()
        else:
            file = self.get_file_list()[0]
            return pn.Card(self.single_plot(file))

    @param.depends('plot_button')
    def plot2(self):
        """
        Plots the second file, if at least two are present
        """
        if self.get_file_list() is None or len(self.get_file_list()) < 2:
            return pn.Card()
        else:
            file = self.get_file_list()[1]
            return pn.Card(self.single_plot(file))

    @param.depends('plot_button')
    def plot3(self):
        """
        Plots the third file, if at least three are present
        """
        if self.get_file_list() is None or len(self.get_file_list()) < 3:
            return pn.Card()
        else:
            file = self.get_file_list()[2]
            return pn.Card(self.single_plot(file))

    @param.depends('plot_button')
    def selection_error(self):
        """
        Displays errors when no files or more than three files are chosen
        """
        if len(self.selected_table_widget.selection) == 0:
            message = "You have to select at least one file to plot."
            pn.state.notifications.error(message, duration=0)
        elif len(self.selected_table_widget.selection) > 3:
            message = "You can only select three files at the same time for plotting."
            pn.state.notifications.error(message, duration=0)

In [None]:
class Statistics(CreatePlots):
    """
    Gathers some basic statistics for a chosen night
    This data can be used later on to plot the information
    """
    night = param.Date(dt.date(2020, 4, 22), precedence=1)
    statistics_button = param.Action(lambda x: x.param.trigger('statistics_button'), label='Plot statistics', precedence=5)

    def __init__(self, **params):
        super().__init__(**params)
        self.all_data = pd.DataFrame()
        self.bias_df = pd.DataFrame()
        self.dark_df = pd.DataFrame()
        self.flat_df = pd.DataFrame()

    def gather_data(self):
        """
        Gather the data from the database for a specific night
        """
        begin_night = dt.datetime.combine(self.night, dt.time(12, 0))
        begin_night_jd = Time(begin_night).jd
        end_night = begin_night + dt.timedelta(days=1)
        end_night_jd = Time(end_night).jd
        table_name = self.from_query()
        query = ADQLQuery.from_(
            table_name
        ).select(
            table_name.filename, table_name.kw_IMAGETYP, table_name.kw_XBINNING
        ).where(
            table_name.obs_jd[begin_night_jd:end_night_jd]
        ).where(
            Criterion.any([
                table_name.kw_IMAGETYP == 'Bias Frame',
                table_name.kw_IMAGETYP == 'Dark Frame',
                table_name.kw_IMAGETYP == 'Flat Field'
            ])
        ).top(3000)
        result = service.search(query.get_sql())
        self.all_data = result.to_table().to_pandas()

    @param.depends('statistics_button')
    def night_statistics_error(self):
        """
        Displays an error if no observation data could be found for the chosen night
        """
        if self.all_data.size == 0:
            message = "There is no observation data for this night."
            pn.state.notifications.error(message, duration=0)

    @param.depends('statistics_button', watch=True)
    def calibration_files(self):
        """
        Puts the different calibration files in the correct DataFrame
        """
        self.gather_data()
        self.bias_df = self.all_data[self.all_data['kw_IMAGETYP']=='Bias Frame']
        self.dark_df = self.all_data[self.all_data['kw_IMAGETYP']=='Dark Frame']
        self.flat_df = self.all_data[self.all_data['kw_IMAGETYP']=='Flat Field']

    def load_file(self, binning, file_type):
        """
        Loads a file containing data about previous nights
        """
        data = np.loadtxt('merged_statistics/' + file_type + '_bin' + str(binning) + '/' + file_type + '_bin' + str(binning) + '.txt', dtype='str')
        return data

    def bias_statistics(self, binning):
        """
        Calculates some basic statistics about the bias frames and read noise
        """
        bias_median = []
        read_noise = []
        bias_bin_df = self.bias_df[self.bias_df['kw_XBINNING']==binning]
        if bias_bin_df.empty:
            return []
        else:
            for i in bias_bin_df['filename']:
                filename = self.give_file_name(i)
                filepath = home_dir + filename
                fits_file = fits.open(filepath)
                fits_data = fits_file[0].data
                fits_file.close()
                bias_median.append(np.median(fits_data))
                read_noise.append(np.var(fits_data)**0.5)
            bias_merged = np.column_stack((bias_median, read_noise))
            return bias_merged

    def dark_statistics(self, binning):
        """
        Calculates some basic statistics about the dark frames
        """
        dark_median = []
        dark_bin_df = self.dark_df[self.dark_df['kw_XBINNING']==binning]
        if dark_bin_df.empty:
            return []
        else:
            bias_data = np.array(self.bias_statistics(binning=binning))
            if len(bias_data) > 0:
                master_bias = np.median(bias_data[:, 0])
            else:
                master_bias_data = self.load_file(binning, 'bias')[:, 1].astype(float)
                master_bias = np.median(master_bias_data)
            for i in self.dark_df['filename']:
                filename = self.give_file_name(i)
                filepath = home_dir + filename
                fits_file = fits.open(filepath)
                fits_data = fits_file[0].data
                exposure_time = fits_file[0].header['EXPTIME']
                fits_file.close()
                dark_corrected = (fits_data - master_bias) / exposure_time
                dark_median.append(np.median(dark_corrected))
            dark_median = np.array(dark_median)
            dark_median = dark_median[np.isfinite(dark_median)]
            return dark_median

    def flat_statistics(self, binning):
        """
        Calculates some basic statistics about the flat fields
        """
        flat_median = []
        flat_filters = []
        flat_bin_df = self.flat_df[self.flat_df['kw_XBINNING']==binning]
        if flat_bin_df.empty:
            return []
        else:
            for i in self.flat_df['filename']:
                filename = self.give_file_name(i)
                filepath = home_dir + filename
                fits_file = fits.open(filepath)
                fits_data = fits_file[0].data
                file_filter = fits_file[0].header['FILTER']
                fits_file.close()
                flat_median.append(np.median(fits_data))
                flat_filters.append(file_filter)
            flat_merged = np.column_stack((flat_median, flat_filters))
            return flat_merged

In [None]:
line_colours = ['g', 'r', 'm', 'c']
linestyles = ['dotted', 'dashed', 'dashdot', 'solid']
bar_colours = ['b', 'y', 'chocolate', 'lime']

class StatisticsPlots(Statistics):
    """
    Creates the plots for the basic statistics tabs
    """
    vmin_vmax_range = param.Range((0, 95), bounds=(0, 100), step=5)

    def vmin_vmax(self, data_array):
        """
        Calculates the values for the minimum and maximum percentile given by the user
        """
        vmin = np.percentile(data_array, self.vmin_vmax_range[0])
        vmax = np.percentile(data_array, self.vmin_vmax_range[1])
        return vmin, vmax

    def nr_bins(self, length):
        """
        Calculates the number of bins needed for the histogram
        """
        bins = np.ceil(np.sqrt(length))
        return bins

    @param.depends('statistics_button')
    def bias_median_plot(self):
        """
        Creates a plot containing old bias data and bias data for the given night
        """
        if self.bias_df.empty:
            return pn.Card()
        else:
            fig = Figure(figsize=(9, 6))
            ax = fig.subplots()
            ax.set_title("Median of the bias frames")
            binning_list = self.bias_df.kw_XBINNING.unique()
            binning_list = np.sort(binning_list)
            for i in binning_list:
                bias_median = self.bias_statistics(binning=i)[:, 0]
                binning_string = " (" + str(i) + "x" + str(i) + "binning)"
                if bias_median.size != 0:
                    ax.axvline(np.median(bias_median), label="Median of the night" + binning_string, linewidth=2, linestyle=linestyles[i-1], color=line_colours[i-1], alpha=0.5)
                old_bias_median = self.load_file(i, 'bias')[:, 1].astype(float)
                vmin, vmax = self.vmin_vmax(old_bias_median)
                bins = self.nr_bins(len(old_bias_median))
                ax.hist(old_bias_median, bins=int(bins), label="Previous data" + binning_string, alpha=0.5, range=(vmin, vmax), color=bar_colours[i-1])
            ax.legend()
            return pn.Card(pn.pane.Matplotlib(fig, tight=True, sizing_mode='scale_both'))

    @param.depends('statistics_button')
    def read_noise_plot(self):
        """
        Creates a plot containing old read noise data and read noise data for the given night
        """
        if self.bias_df.empty:
            return pn.Card()
        else:
            fig = Figure(figsize=(9, 6))
            ax = fig.subplots()
            ax.set_title("Median of the read noise")
            binning_list = self.bias_df.kw_XBINNING.unique()
            binning_list = np.sort(binning_list)
            for i in binning_list:
                read_noise = self.bias_statistics(binning=i)[:, 1]
                binning_string = " (" + str(i) + "x" + str(i) + "binning)"
                if read_noise.size != 0:
                    ax.axvline(np.median(read_noise), label="Median of the night" + binning_string, linewidth=2, linestyle=linestyles[i-1], color=line_colours[i-1], alpha=0.5)
                old_read_noise = self.load_file(i, 'bias')[:, 2].astype(float)
                vmin, vmax = self.vmin_vmax(old_read_noise)
                bins = self.nr_bins(len(old_read_noise))
                ax.hist(old_read_noise, bins=int(bins), label="Previous data" + binning_string, alpha=0.5, range=(vmin, vmax), color=bar_colours[i-1])
            ax.legend()
            return pn.Card(pn.pane.Matplotlib(fig, tight=True, sizing_mode='scale_both'))

    @param.depends('statistics_button')
    def dark_median_plot(self):
        """
        Creates a plot containing old dark data and dark data for the given night
        """
        if self.dark_df.empty:
            return pn.Card()
        else:
            fig = Figure(figsize=(9, 6))
            ax = fig.subplots()
            ax.set_title("Median of the dark frames")
            binning_list = self.dark_df.kw_XBINNING.unique()
            binning_list = np.sort(binning_list)
            for i in binning_list:
                dark_median = self.dark_statistics(binning=i)
                binning_string = " (" + str(i) + "x" + str(i) + "binning)"
                if dark_median.size != 0:
                    ax.axvline(np.median(dark_median), label="Median of the night" + binning_string, linewidth=2, linestyle=linestyles[i-1], color=line_colours[i-1], alpha=0.5)
                old_dark_file = self.load_file(i, 'dark')
                old_dark_median = old_dark_file[:, 1].astype(float)
                old_dark_exp = old_dark_file[:, 2].astype(float)
                old_bias_median = self.load_file(i, 'bias')[:, 1].astype(float)
                old_dark_median = (old_dark_median - np.median(old_bias_median)) / old_dark_exp
                bins = self.nr_bins(len(old_dark_median))
                old_dark_median = old_dark_median[np.isfinite(old_dark_median)]
                vmin, vmax = self.vmin_vmax(old_dark_median)
                ax.hist(old_dark_median, bins=int(bins), label="Previous data" + binning_string, alpha=0.5, range=(vmin, vmax), color=bar_colours[i - 1])
            ax.legend()
            return pn.Card(pn.pane.Matplotlib(fig, tight=True, sizing_mode='scale_both'))

    @param.depends('statistics_button')
    def flat_median_plot(self):
        """
        Creates a plot containing old flat data and flat data for the given night
        """
        if self.flat_df.empty:
            return pn.Card()
        else:
            fig = Figure(figsize=(9, 6))
            ax = fig.subplots()
            ax.set_title("Median of the flat frames")
            binning_list = self.flat_df.kw_XBINNING.unique()
            binning_list = np.sort(binning_list)
            for i in binning_list:
                flat_median = self.flat_statistics(binning=i)[:, 0].astype(float)
                binning_string = " (" + str(i) + "x" + str(i) + "binning)"
                if flat_median.size != 0:
                    ax.axvline(np.median(flat_median), label="Median of the night" + binning_string, linewidth=2, linestyle=linestyles[i-1], color=line_colours[i-1], alpha=0.5)
                old_flat_median = self.load_file(i, 'flat')[:, 1].astype(float)
                vmin, vmax = self.vmin_vmax(old_flat_median)
                bins = self.nr_bins(len(old_flat_median))
                ax.hist(old_flat_median, bins=int(bins), label="Previous data" + binning_string, alpha=0.5, range=(vmin, vmax), color=bar_colours[i-1])
            ax.legend()
            return pn.Card(pn.pane.Matplotlib(fig, tight=True, sizing_mode='scale_both'))

In [None]:
class CustomGrid(pn.GridBox):
    """
    Custom grid for the layout
    """
    def __init__(self, *objects, **params):
        super().__init__(*objects, **params, ncols=2, nrows=1)

class CreateView(StatisticsPlots):
    """
    This class creates the final view that the user will see
    It gets all the widgets and data together and puts it into a nice UI
    """
    def panel(self):
        """
        Create Panel widgets from the parametrized classes
        Creates the different tabs and adds widgets accordingly
        Displays the dashboard
        """
        widgets_primary = {
            'dates': {'widget_type': pn.widgets.DatetimeRangePicker, 'max_width': 350, 'name': 'Date range'},
            'dates_order': {'widget_type': pn.widgets.Select, 'max_width': 160, 'name': 'Date sorting'},
            'reset_date_button': {'widget_type': pn.widgets.Button, 'button_type': 'warning', 'max_width': 160, 'align': 'end'},
            'select_columns': {'widget_type': pn.widgets.CrossSelector, 'definition_order': False, 'name': 'Columns'},
            'ra_dec_not_null': {'widget_type': pn.widgets.RadioButtonGroup},
            'object_or_coordinates': {'widget_type': pn.widgets.RadioButtonGroup, 'inline': True},
            'select_object': pn.widgets.TextInput,
            'search_coordinates': {'widget_type': pn.widgets.TextInput, 'name': 'Coordinates (hh mm ss.ms, dd mm ss.ms)'},
            'box_or_cone': {'widget_type': pn.widgets.RadioButtonGroup, 'inline': True},
            'cone_radius': {'widget_type': pn.widgets.FloatSlider, 'name': 'Cone radius (arc minutes)' , 'step': 0.5},
            'box_width': {'widget_type': pn.widgets.FloatInput, 'name': 'Box width (arc seconds)', 'max_width': 160},
            'box_height': {'widget_type': pn.widgets.FloatInput, 'name': 'Box height (arc seconds)', 'max_width': 160},
            'nr_entries': {'widget_type': pn.widgets.IntInput, 'name': '# of entries (max 5000)', 'start': 1, 'end': 5000, 'step': 1},
            'search_object_button': {'widget_type': pn.widgets.Button, 'button_type': 'primary'},
            'reset_search_button': {'widget_type': pn.widgets.Button, 'button_type': 'warning'}
        }
        settings_primary1 = pn.Row(
            pn.Param(self, widgets=widgets_primary, width=385, sizing_mode="fixed", name="Settings", parameters=[
                'dates'
            ]),
            height = 80
        )
        settings_primary2 = pn.Row(
            pn.Param(self, widgets=widgets_primary, width=385, sizing_mode="fixed", default_layout=CustomGrid, show_name=False, parameters=[
                'dates_order',
                'reset_date_button'
            ])
        )
        settings_primary3 = pn.Row(
            pn.Param(self, widgets=widgets_primary, width=385, sizing_mode="fixed", show_name=False, parameters=[
                'select_columns',
                'ra_dec_not_null',
                'object_or_coordinates',
                'select_object',
                'search_coordinates',
                'box_or_cone',
                'cone_radius'
            ]),
            height = 380
        )
        settings_primary4 = pn.Row(
            pn.Param(self, widgets=widgets_primary, width=385, sizing_mode="fixed", default_layout=CustomGrid, show_name=False, parameters=[
                'box_width',
                'box_height'
            ]),
            height = 70
        )
        settings_primary5 = pn.Row(
            pn.Param(self, widgets=widgets_primary, width=385, sizing_mode="fixed", show_name=False, parameters=[
                'nr_entries',
                'search_object_button',
                'reset_search_button'
            ])
        )
        settings_tabs = pn.Tabs(
            ('Query', pn.Column(
                settings_primary1,
                settings_primary2,
                settings_primary3,
                settings_primary4,
                settings_primary5
            ))
        )

        widgets_secondary = {
            'night': {'widget_type': pn.widgets.DatePicker, 'name': 'Pick a night'},
            'statistics_button': {'widget_type': pn.widgets.Button, 'button_type': 'primary'},
            'vmin_vmax_range' : {'widget_type': pn.widgets.IntRangeSlider, 'start': 0, 'end': 100, 'value': (0, 95), 'step': 5, 'name': 'Minimum and maximum percentile'}
        }
        settings_secondary = pn.Row(
            pn.Param(self, widgets=widgets_secondary, width=385, sizing_mode="fixed", name="Settings", parameters=[
                'night',
                'vmin_vmax_range',
                'statistics_button'
            ])
        )
        settings_tabs.append(
            ('Statistics', pn.Column(
                settings_secondary
            ))
        )

        bootstrap.sidebar.append(settings_tabs)

        panel_plot_button = {'plot_button': {'widget_type': pn.widgets.Button, 'button_type': 'primary', 'name': 'Plot'}}
        plot_grid = pn.Column(
            pn.Row(
                pn.Card(self.selected_table,
                        pn.Param(self, widgets=panel_plot_button, show_name=False, parameters=['plot_button'])),
                self.plot1
            ),
            pn.Row(
                self.plot2,
                self.plot3
            ),
            self.selection_error,
            self.file_not_found
        )

        statistics_grid = pn.Column(
            pn.Row(
                self.bias_median_plot,
                self.read_noise_plot
            ),
            pn.Row(
                self.dark_median_plot,
                self.flat_median_plot
            ),
            self.night_statistics_error
        )

        user_guide = pn.pane.HTML("""
        <iframe src="https://docs.google.com/viewer?url=User_Guide.pdf&embedded=true"></iframe>
        """)

        main_tabs = pn.Tabs(
            ('Data Table', pn.Column(self.table_widget,
                                     self.display_object_error,
                                     self.display_data_error)
             ),
            ('Data Plotting', plot_grid),
            ('Statistics Plotting', statistics_grid),
            ('User Guide', user_guide)
        )

        def change_settings_tabs(target, event):
            """
            Changes the settings tab based on which main tab is active
            """
            if event.new == 0 or event.new == 1:
                settings_tabs.active = 0
            else:
                settings_tabs.active = 1

        main_tabs.link(settings_tabs, callbacks={'active': change_settings_tabs})

        bootstrap.main.append(main_tabs)
        return bootstrap

In [8]:
"""
This is what calls the application and displays it
"""
bootstrap = pn.template.BootstrapTemplate(title='Blaauw Dashboard',
                                          sidebar_width=400,
                                          header_background='#bf1900',
                                          logo='header_logo/rug_logo_white.png')

dashboard = CreateView()
dashboard.panel().servable()

Launching server at http://localhost:59215
