In [None]:
# default_exp utils

# utils

> Supplies basic utility functions.

In [None]:
#export
import datetime
from typing import Union
import os
import shutil
import json

import numpy as np
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, TableColumn, DataTable, LinearColorMapper, ColorBar
from bokeh.models.widgets import HTMLTemplateFormatter
from bokeh.palettes import Viridis
import panel as pn
import panel.widgets as pnw
import pandas as pd

import icevision.parsers as parsers
from icevision.data.data_splitter import RandomSplitter
from icevision.core.bbox import BBox

from icevision.visualize.draw_data import draw_record, draw_pred
from icevision.core.class_map import ClassMap

In [None]:
pn.extension()

## Test data setup

In [None]:
import icedata

In [None]:
test_data_dir = icedata.fridge.load_data()
test_class_map = icedata.fridge.class_map()
test_parser = icedata.fridge.parser(test_data_dir, test_class_map)
test_train_records, test_valid_records = test_parser.parse()

## Data

In [None]:
#export
class ObservableList:
    def __init__(self, observable_list):
        self._observable_list = observable_list
        self._observer = []
    
    def register_callback(self, callback):
        self._observer.append(callback)
    
    def _trigger_observer(self):
        for observer in self._observer:
            observer(self._observable_list)
    
    @property
    def observable_list(self):
        return self._observable_list
    
    @observable_list.setter
    def observable_list(self, value):
        self._observable_list = value
        self._trigger_observer()
    
    def __repr__(self):
        return self._observable_list.__repr__()
    
    def __iter__(self):
        for item in self._observable_list:
            yield item
    
    def __len__(self):
        return len(self._observable_list)
    
    def __getitem__(self, idx):
        return self._observable_list[idx]
    
    def __setitem__(self, idx, value):
        self._observable_list[idx] = value
        self._trigger_observer()
    
    def append(self, item):
        self._observable_list.append(item)
        self._trigger_observer()
        
    def remove(self, item):
        self._observable_list.remove(item)
        self._trigger_observer()
        
    def insert(self, index, item):
        self._observable_list.insert(index, item)
        self._trigger_observer()
    
    def pop(self, index=-1):
        poped_item = self._observable_list.pop(index)
        self._trigger_observer()
        return poped_item
    
    def extend(self, iterable):
        self._observable_list.extend(iterable)
        self._trigger_observer()

In [None]:
obs_list = ObservableList([])
call_register = []
obs_list.register_callback(lambda x: call_register.append(x))
obs_list.observable_list = [1]
assert call_register[-1] == obs_list.observable_list
obs_list.observable_list.append(2)
assert call_register[-1] == obs_list.observable_list

obs_list.observable_list.pop()
assert call_register[-1] == obs_list.observable_list

obs_list.observable_list.extend([3,4,5])
assert call_register[-1] == obs_list.observable_list

obs_list.observable_list.insert(2, 4)
assert call_register[-1] == obs_list.observable_list

obs_list.observable_list.remove(4)
assert call_register[-1] == obs_list.observable_list

In [None]:
#export
class RecordDataframeParser(parsers.FasterRCNN, parsers.FilepathMixin, parsers.SizeMixin):
    def __init__(self, record_dataframe):
        self.record_dataframe = record_dataframe
        
    def __iter__(self):
        for group in self.record_dataframe.groupby("filepath"):
            yield group[1]
    
    def imageid(self, o):
        return o.iloc[0]["id"]
    
    def filepath(self, o):
        return o.iloc[0]["filepath"]
    
    def image_width_height(self, o):
        width, height = o.iloc[0]["width"], o.iloc[0]["height"]
        return (width, height)
    
    def bboxes(self, o):
        return [BBox(annot[1]["bbox_xmin"], annot[1]["bbox_ymin"], annot[1]["bbox_xmax"], annot[1]["bbox_ymax"]) for annot in o.iterrows()]
    
    def labels(self, o):
        return [annot[1]["label_num"] for annot in o.iterrows()]

In [None]:
#export
def create_class_map_from_record_df(record_df):
    sorted_labels = record_df["label"].unique()[np.argsort(record_df["label_num"].unique())].tolist()
    sorted_label_nums = sorted(record_df["label_num"].unique())
    label_map = {key: value for key, value in zip(sorted_label_nums, sorted_labels)}
    return ClassMap([label_map[i] if i in label_map.keys() else "unknown_"+str(i) for i in range(max(sorted_label_nums))])

In [None]:
#export
class RecordDataset:
    def __init__(self, records: Union[list, ObservableList, str], class_map=None, name=None, description=None):
        if isinstance(records, str):
            self.load_from_file(records)
        else:
            self.records = records if isinstance(records, ObservableList) else ObservableList(records)
            self.class_map = class_map
            self._name = "dataset" if name is None else name
            self._description = "" if description is None else description
        
        self.records.register_callback(self.reset_infert_data)
        
        self.__variables = []
        self._callbacks = []
        self.register_variable("_data")
        self.register_variable("_dataset_stats")
        self.register_variable("_class_stats")
        self.register_variable("_image_stats")
    
    def __repr__(self):
        base_string = ""
        for key, value in self.dataset_stats.items():
            base_string += str(key) + ": " + str(value) + " | "
        base_string = base_string[:-2]
        return base_string
    
    def register_callback(self, callback):
        self._callbacks.append(callback)
        
    def trigger_callbacks(self):
        for callback in self._callbacks:
            callback()
    
    def set_variable_if_is_none_and_return_it(self, variable, value_func):
        if variable is None:
            variable = value_func()
        return variable
        
    def register_variable(self, variable_name, value=None):
        setattr(self, variable_name, None)
        self.__variables.append(variable_name)
        
    def reset_infert_data(self, update):
        for variable_name in self.__variables:
            setattr(self, variable_name, None)
        
    @property
    def name(self):
        return self._name
    
    @name.setter
    def name(self, value):
        self._name = value
        self.reset_infert_data(None)
    
    @property
    def description(self):
        return self._description
        
    @description.setter
    def description(self, value):
        self._description = value
        self.reset_infert_data(None)
        
    @classmethod
    def load_from_record_dataframe(cls, record_data_df: pd.DataFrame, class_map=None, name=None, description=None):
        records = RecordDataframeParser(record_data_df).parse(RandomSplitter([1]))[0]
        if class_map is None:
            class_map = create_class_map_from_record_df(record_data_df)
        return cls(records, class_map=class_map, name=name, description=description)

    def load_from_file(self, path):
        data = json.load(open(path))
        df = pd.DataFrame(data["data"])
        records = RecordDataframeParser(df).parse(RandomSplitter([1]))[0]
        
        self.records = ObservableList(records)
        self.class_map = ClassMap(data["class_map"])
        self._name = data["name"]
        self._description = data["description"]
    
    def save(self, save_path):
        if not os.path.isdir(save_path):
            os.makedirs(save_path, exist_ok=True)
        save_name = "dataset" if self.name == "" else self.name
        if not os.path.isfile(os.path.join(save_path, save_name+".json")):
            save_name = save_name+".json"
        else:
            counter = 1
            while True:
                save_name = save_name+"("+str(counter)+").json"
                if os.path.isfile(os.path.join(save_path, save_name)):
                    counter += 1
                else:
                    break
        
        class_map = self.class_map if self.class_map is not None else create_class_map_from_record_df(df)
        save_data = {"name": self.name, "description": self.description, "data": self.data.to_dict(), "class_map": class_map.id2class}
        
        json.dump(save_data, open(os.path.join(save_path, save_name), "w"), default=str)
    
    def calculate_record_data(self):
        """Aggregates stats from a list of records and returns a pandas dataframe with the aggregated stats. The creation time is not necessarily the real creation time. 
        This depends on the OS, for more information see: https://docs.python.org/3/library/os.html#os.stat_result."""
        data = []
        for index,record in enumerate(self.records):
            for label, bbox in zip(record["labels"], record["bboxes"]):
                file_stats = record["filepath"].stat()
                bbox_widht = bbox.xmax - bbox.xmin
                bbox_height = bbox.ymax - bbox.ymin
                area = bbox_widht * bbox_height
                area_normalized = area / (record["width"] * record["height"])
                bbox_ratio = bbox_widht / bbox_height
                data.append(
                    {
                        "id": record["imageid"], "width": record["width"], "height": record["height"], "label": label, 
                        "bbox_xmin": bbox.xmin, "bbox_xmax": bbox.xmax, "bbox_ymin": bbox.ymin, "bbox_ymax": bbox.ymax, "area": area, 
                        "area_normalized": area_normalized, "bbox_ratio": bbox_ratio, "record_index": index, "bbox_width": bbox_widht, 
                        "bbox_height": bbox_height, "filepath": str(record["filepath"]), "creation_date": datetime.datetime.fromtimestamp(file_stats.st_ctime), 
                        "modification_date": datetime.datetime.fromtimestamp(file_stats.st_mtime), "num_annotations": len(record["bboxes"])
                    }
                )
        data = pd.DataFrame(data)
        data["label_num"] = data["label"]
        if self.class_map is not None:
            data["label"] = data["label"].apply(self.class_map.get_id)
        return data
    
    @property
    def data(self):
        return self.set_variable_if_is_none_and_return_it(self._data, self.calculate_record_data)
    
    def calculate_dataset_stats(self):
        stats_dict = {}
        stats_dict["no_imgs"] = self.data["filepath"].nunique()
        stats_dict["no_classes"] = self.data["label"].nunique()
        stats_dict["classes"] = list(self.data["label"].unique())
        stats_dict["area_min"] = self.data["area"].min()
        stats_dict["area_max"] = self.data["area"].max()
        stats_dict["num_annotations_min"] = self.data["num_annotations"].min()
        stats_dict["num_annotations_max"] = self.data["num_annotations"].max()
        stats_dict["name"] = self._name
        stats_dict["description"] = self._description
        return stats_dict
    
    @property
    def dataset_stats(self):
        return self.set_variable_if_is_none_and_return_it(self._dataset_stats, self.calculate_dataset_stats)
    
    def calculate_class_stats(self):
        """Creates a dataframe containing stats about the object classes."""
        stats_dict = {}
        label_group = self.data.groupby("label")
        for label, group in label_group:
            label_stats = {}
            label_stats["imgs"] = group["filepath"].nunique()
            label_stats["objects"] = group.shape[0]
            label_stats["objects_per_img"] = label_stats["objects"]/label_stats["imgs"]
            label_stats["frac_of_labels"] = round(label_stats["objects"]/self.data.shape[0], 2)
            stats_dict[label] = label_stats
        df = pd.DataFrame(stats_dict).T
        df = df.rename_axis('Class').reset_index()
        return df
    
    @property
    def class_stats(self):
        return self.set_variable_if_is_none_and_return_it(self._class_stats, self.calculate_class_stats)
    
    def calculate_image_stats(self):
        """Creates a dataframe containing stats about the images."""
        stats_dict = {}
        stats_dict["Num. imgs."] = self.data["filepath"].nunique()
        stats_dict["Min Num. Objects"] = self.data["num_annotations"].min()
        stats_dict["Max Num. Objects"] = self.data["num_annotations"].max()
        stats_dict["Avg. Objects/Img"] = round(self.data["num_annotations"].mean(),2)
        df = pd.DataFrame.from_dict(stats_dict, orient="index").T
        return df
    
    @property
    def image_stats(self):
        return self.set_variable_if_is_none_and_return_it(self._image_stats, self.calculate_image_stats)

In [None]:
test_record_dataset = RecordDataset(test_valid_records, test_class_map)
assert isinstance(test_record_dataset.data, pd.DataFrame)
assert isinstance(test_record_dataset.dataset_stats, dict)
assert isinstance(test_record_dataset.class_stats, pd.DataFrame)
assert isinstance(test_record_dataset.image_stats, pd.DataFrame)

In [None]:
test_record_dataset.name = "Test"
assert test_record_dataset.name == "Test"
test_record_dataset.description = "A short description"
assert test_record_dataset.description == "A short description"

In [None]:
test_old_record_dataset_stats = test_record_dataset.dataset_stats
test_record_dataset.records.observable_list = test_train_records
test_new_record_dataset_stats = test_record_dataset.dataset_stats
assert test_old_record_dataset_stats != test_new_record_dataset_stats

In [None]:
test_regenerated_record_dataset = RecordDataset.load_from_record_dataframe(test_record_dataset.data)
assert len(test_regenerated_record_dataset.records) == len(test_record_dataset.records)

In [None]:
shutil.rmtree("dump_dir", ignore_errors=True)
os.mkdir("dump_dir")
test_record_dataset.name = ""
test_record_dataset.save("dump_dir")
test_record_dataset.save("dump_dir")
assert len(os.listdir("dump_dir")) == 2
assert os.path.isfile("dump_dir/dataset.json")
test_loaded_record_dataset = RecordDataset("dump_dir/dataset.json")
assert test_record_dataset.data.sort_values("area").shape == test_loaded_record_dataset.data.sort_values("id").shape
shutil.rmtree("dump_dir")

In [None]:
#export
def calculate_mixing_matrix(data, mixing_col, mixing_objects, return_df=True):
    """Calculates mixing matrix for the mixing_objects column where they mix in the mixing_col. 
    By standard the object class mixing matrix over the images is calculated. 
    Returns the mixing matrix and the mapping between label and mixing matrix index.
    If return_df is True (default) a dataframe (instead of the mixing matrix) will be returned that can be directly consumed by histogram_2d."""
    # map labels to the mixing matrix index
    mapping = {i:j for j,i in enumerate(np.sort(data[mixing_objects].unique()))}
    mixing_matrix = np.zeros([data[mixing_objects].nunique(), data[mixing_objects].nunique()])
    mixing_groups = data.groupby(mixing_col)
    # iterate over each individual element with the same mixing_col to calculate the mixing based on the mixing_objects
    for group_key, group in mixing_groups:
        # handel self mixing
        for value, count in group[mixing_objects].value_counts().iteritems():
            if count > 1:
                mixing_matrix[mapping[value]] += 1
        # handel mixing of different objects
        permutations = np.array(np.meshgrid(group[mixing_objects].unique(), group[mixing_objects].unique())).T.reshape(-1,2)
        for permutation in permutations:
            # avoid double counting in the self mixing 
            if permutation[0] != permutation[1]:
                mixing_matrix[mapping[permutation[0]], mapping[permutation[1]]] += 1
    if return_df:
        df_dict = {"values": [], "col_name": [], "row_name": []}
        for row_name, row in zip(mapping, mixing_matrix):
            df_dict["values"] += row.tolist()
            df_dict["row_name"] += [row_name]*len(mapping)
            df_dict["col_name"] += mapping
        return pd.DataFrame(df_dict), mapping
    return mixing_matrix, mapping

Only counts once if objects occure in the same image not multiplt times.

In [None]:
# test without class_map
test_mixing_matrix_df, test_mixing_matrix_mapping = calculate_mixing_matrix(test_record_dataset.data, mixing_col="filepath", mixing_objects="label")
# test class_map
test_mixing_matrix_df, test_mixing_matrix_mapping = calculate_mixing_matrix(test_record_dataset.data, mixing_col="filepath", mixing_objects="label")

In [None]:
test_mixing_matrix_df.head()

## Plotting

In [None]:
#export
def convert_rgb_image_to_bokeh_rgb_image(img: np.ndarray):
    """Convertes a image in the form of a numpy array to an array that can be shown by bokeh."""
    img = np.flipud(img)
    img = img.astype(np.uint8)
    bokeh_img = np.empty((img.shape[0],img.shape[1]), dtype=np.uint32)
    view = bokeh_img.view(dtype=np.uint8).reshape((img.shape[0],img.shape[1], 4))
    view[:,:, 0] = img[:,:,0]
    view[:,:, 1] = img[:,:,1]
    view[:,:, 2] = img[:,:,2]
    view[:,:, 3] = 255
    return bokeh_img

Bokeh requries images to be in a hw format where each value is a 32bit integer where each of the 8bit sequences contains the rgb and alpha values.

In [None]:
#hide
img = np.random.randint(0, 256, [10,10,3], dtype=np.uint8)
bokeh_img = convert_rgb_image_to_bokeh_rgb_image(img)
assert bokeh_img.shape == (10,10)
assert bokeh_img.dtype == np.uint32

In [None]:
#export
def draw_record_with_bokeh(
    record,
    class_map=None,
    display_label=True,
    display_bbox=False,
    display_mask=False,
    display_keypoints=False,
    return_figure=False,
    width=None,
    height=None
):
    """Draws a record or returns a bokeh figure containing the image."""
    img = draw_record(
            record=record,
            class_map=class_map,
            display_label=display_label,
            display_bbox=display_bbox,
            display_mask=display_mask,
            display_keypoints=display_keypoints,
        )

    # create bokeh figure with the plot
    bokeh_img = convert_rgb_image_to_bokeh_rgb_image(img)
    
    # make sure the aspect ratio of the image is retained, if only the width of hight is given
    if width is None and height is not None:
        plot_width = int(img.shape[1]/img.shape[0] * height)
        plot_height = height
    elif height is None and width is not None:
        plot_width = width
        plot_height = int(img.shape[0]/img.shape[1] * width)
    else:
        plot_width = img.shape[1] if width is None else width
        plot_height = img.shape[0] if height is None else height
    
    p = figure(tools="reset, wheel_zoom, box_zoom, save, pan", width=plot_width, height=plot_height, x_range=(0, img.shape[1]), y_range=(img.shape[0], 0), x_axis_location="above")
    p.xgrid.grid_line_color = None
    p.ygrid.grid_line_color = None
    p.image_rgba([bokeh_img], x=0, y=img.shape[0], dw=img.shape[1], dh=img.shape[0], level="image")
    if return_figure:
        return p
    else:
        show(p)

Wrapper around the `draw_record` function from icevision. The aspect ratio of the image will be preserved when only width or height is given (scaling the other accordingly).

In [None]:
draw_record_with_bokeh(test_train_records[0], width=200)

In [None]:
#export
def barplot(counts, values, class_map=None, bar_type="horizontal", width=500, height=500):
    """Creates a figure with a barplot, were the counts is the bar height and values are the labels for the bars."""
    if class_map is None:
        values = [str(entry) for entry in values]
    else:
        values = [class_map.get_id(entry) for entry in values]
    if bar_type == "horizontal":
        p = figure(width=width, height=height, y_range=values)
        p.hbar(y=values, left=0, right=counts, height=0.9)
    elif bar_type == "vertical":
        p = figure(width=width, height=height, x_range=values)
        p.vbar(x=values, bottom=0, top=counts, width=0.9)
    else:
        raise ValueError("hist_type has to be of 'horizontal' or 'vertical'")
    return p

In [None]:
# test draw_histogram without a classmap
p = barplot([10, 20], [1,2])
pn.Row(p)

In [None]:
# test draw_histogram without a classmap
hist = [[1, 10], [2, 20]]
cls_map = ClassMap(["a", "b"])
p = barplot([10, 20], [1, 2], cls_map, bar_type="vertical")
pn.Row(p)

In [None]:
#export
def histogram(values, bins=10, range=None, density=False, plot_figure=None, remove_tools=False, width=500, height=500):
    "Creates a histogram"
    if plot_figure is None:
        p = figure(width=width, height=height)
    else:
        p = plot_figure
    counts, edges = np.histogram(values, bins=bins, range=range, density=density)
    p.quad(top=counts, bottom=0, left=edges[:-1], right=edges[1:])
    if remove_tools:
        p.toolbar.logo = None
        p.toolbar_location = None
    return p

In [None]:
pn.Row(histogram([1,1,1,1,2,2,2,3,3,4], bins=4, range=(1,4), density=True))

In [None]:
p = figure(title="Test")
p = histogram([1,1,1,1,2,2,2,3,3,4], bins=4, plot_figure=p)
pn.Row(p)

In [None]:
#export
def histogram_2d(df, x, y, values, color_mapper=None, height=500, width=500):
    if color_mapper is None:
        color_mapper = LinearColorMapper(palette=Viridis[256], low=df[values].min(), high=df[values].max())
    
    # ensure the x and y column are in a categorical format
    if df[x].dtype != str or df[y].dtype != str:
        df = df.copy()
        df[x] = df[x].astype(str)
        df[y] = df[y].astype(str)
    
    p = figure(
        title="Classes in image mixing matrix", x_range=sorted(df[x].unique())[::-1], y_range=sorted(df[y].unique()), 
        x_axis_location="above", tools="hover", toolbar_location=None, tooltips=[('Mixing images: ', '@'+values)],
        width=width, height=height
    )

    p.grid.grid_line_color = None
    p.axis.axis_line_color = None
    p.axis.major_tick_line_color = None
    p.axis.major_label_standoff = 0

    p.rect(
        x=x, y=y, width=1, height=1, source=df,
        fill_color={'field': values, 'transform': color_mapper},
        line_color=None
    )

    color_bar = ColorBar(
        color_mapper=color_mapper, major_label_text_font_size="7px",
        #ticker=BasicTicker(desired_num_ticks=len(colors)),
        label_standoff=6, border_line_color=None, location=(0, 0)
    )
    p.add_layout(color_bar, 'right')

    return p

In [None]:
pn.Column(histogram_2d(test_mixing_matrix_df, "row_name", "col_name", "values"))

In [None]:
#export
def time_arc_plot(start_date, end_date, plot_figure=None, width=500, height=300):
    radius = (end_date-start_date)/2
    x = start_date + radius
    if plot_figure is None:
        p = figure(x_axis_label="Date", x_axis_type='datetime', y_range=(0, radius.max()), x_range=(start_date.min(), end_date.max()), width=width, height=height)
        p.yaxis.major_tick_line_color = None
        p.yaxis.minor_tick_line_color = None
        p.yaxis.major_label_text_font_size = '0pt'
        p.toolbar.logo = None
        p.toolbar_location = None
    else:
        p = plot_figure
    p.arc(x=x, y=0, radius=radius, start_angle=0, end_angle=np.pi)
    return p

In [None]:
random_time_delta = pd.DataFrame([datetime.timedelta(days=np.random.randint(0, 50)) for _ in range(test_record_dataset.data.shape[0])], columns=["t_delta"])
pn.Row(time_arc_plot(test_record_dataset.data["creation_date"], test_record_dataset.data["modification_date"]+random_time_delta["t_delta"]))

In [None]:
#export
def table_from_dataframe(dataframe, columns=None, width=500, height=200, index_position=None):
    if columns is not None:
        selection = dataframe[columns]
    else:
        selection = dataframe
    source = ColumnDataSource(selection)
    table_cols = [TableColumn(field=filed, title=filed.replace("_", " ").title()) for filed in selection.columns]
    data_table = DataTable(source=source, columns=table_cols, width=width, height=height)
    data_table.index_position = index_position
    return data_table

In [None]:
pn.Row(table_from_dataframe(test_record_dataset.data, width=1000))

In [None]:
#export
def create_datasets_overview(datasets, cols=None, width=500, height=100):
    "Creates an overview table for a list of record datasets. With cols the columns for the resulting table can be choosen."
    dataset_stats_list = []
    for dataset in datasets:
        dataset_stats = dataset.dataset_stats
        dataset_stats["classes"] = ", ".join([str(entry) for entry in dataset_stats["classes"]])
        dataset_stats = dataset_stats if cols is None else {key: value for key, value in dataset_stats.items() if key in cols}
        dataset_stats_list.append(dataset_stats)
    dataset_stats_df = pd.DataFrame(dataset_stats_list)

    template = """<span href="#" data-toggle="tooltip" title="<%= value %>"><%= value %></span>"""
    table = pnw.DataFrame(dataset_stats_df, formatters={key: HTMLTemplateFormatter(template=template) for key in dataset_stats_df.columns}, selection=[0], width=width, height=height)
    return table

In [None]:
create_datasets_overview([test_record_dataset, test_record_dataset, test_record_dataset], cols=["no_imgs", "classes"])

In [None]:
create_datasets_overview([test_record_dataset, test_record_dataset, test_record_dataset])

## Controll elements

In [None]:
#export
def generate_range_filter(data, name, with_hist=True, steps=50, height=500, width=500):
    "Generates a range slider with a histogram (if with_hist is True) for a given pd.DataFrame and a column key."
    val_min = data.min()
    val_max = data.max()
    # subtract and add a bit to the min and max value to ensure the whole range is captured
    dist = val_max-val_min if val_max != val_min else 1
    val_min = val_min-0.01*dist
    val_max = val_max+0.01*dist
    slider = pnw.RangeSlider(name=name, start=val_min, end=val_max, step=round(((val_max-val_min)/steps), 1), width=int(0.97*width))
    if with_hist:
        hist = histogram(data, bins=20, height=100, width=width, remove_tools=True)
    else:
        hist = None
    range_filter = pn.Column(slider, hist, "<br>")
    return range_filter

In [None]:
generate_range_filter(test_record_dataset.data["area"], "Area", width=600)

In [None]:
#export
def get_min_and_max_dates(dates):
    min_date = dates.min().to_pydatetime().replace(microsecond=0, second=0, minute=0, hour=0)
    max_date = dates.max().to_pydatetime().replace(microsecond=0, second=0, minute=0, hour=0)
    # make sure the min and max values are at least a day appart
    if min_date == max_date:
        max_date = max_date.replace(day=max_date.day+1)
    return min_date, max_date

In [None]:
#export
def generate_creation_modification_time_filter(data, width=500, height=500):
    """Generates an arc plot with creation and modification time and two range sliders to select parts for the two."""
    plot = time_arc_plot(data["creation_date"], data["modification_date"], width=width)
    min_creation_date, max_creation_date = get_min_and_max_dates(data["creation_date"])
    min_modification_date, max_modification_date = get_min_and_max_dates(data["modification_date"])
    min_date = min(min_creation_date, min_modification_date)
    max_date = max(max_creation_date, max_modification_date)
    creation_time_slider = pnw.DateRangeSlider(name="Creation Time", start=min_date, end=max_date, width=width)
    modification_time_slider = pnw.DateRangeSlider(name="Modification Time", start=min_date, end=max_date, width=width)
    return pn.Column(plot, creation_time_slider, modification_time_slider)

In [None]:
test_time_selection = test_record_dataset.data.copy()
test_time_selection["modification_date"] += random_time_delta["t_delta"]
generate_creation_modification_time_filter(test_time_selection, width=200)