In [1]:
from datetime import datetime, timedelta
import threading
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from ipywidgets import widgets
from scipy import stats
import time
import json


# Config
surrounding_duration = timedelta(days=15)

# Internal globals
cached_donors = None
donors = None
gaps_done = 0
column_name = None
running = True
imputing_classification = False
hd_donors_selector = None
hd_confirm_button = None


def hotdeck_classification(receiver: pd.DataFrame, config: object) -> Tuple[pd.DataFrame, str]:
    global imputing_classification
    imputing_classification = True
    imputed, imputer_config = hotdeck(receiver, config)
    imputed[column_name] = imputed[column_name].round()
    return imputed, imputer_config


def hotdeck(receiver: pd.DataFrame, config: object) -> Tuple[pd.DataFrame, str]:
    global file_select, donors, gaps_done, running

    custom_progress_status = widgets.HTML()
    display(custom_progress_status)

    if cached_donors == None:
        load_donors_in_cache(donors, custom_progress_status)

    gaps_done = 0
    receiver_cp = receiver.copy()
    gap_indices = config["current_gap_indices"]

    ui_thread = threading.Thread(target=run_ui, args=(len(gap_indices), custom_progress_status))
    ui_thread.start()
    try:
        impute_gaps(receiver_cp, gap_indices)
    except:
        running = False
        ui_thread.join()
        raise
    ui_thread.join()

    return receiver_cp, json.dumps({
        'donors': donors,
        'surrounding_duration': str(surrounding_duration)
    })


def impute_gaps(receiver: pd.DataFrame, gap_indices: [[datetime]]) -> None:
    global surrounding_duration, column_name, donors, gaps_done

    for gap in gap_indices:
        gap_start_idx, gap_end_idx = get_gap_boundaries(receiver, gap[0], gap[-1])
        gap_start_time = receiver.index[gap_start_idx]
        gap_end_time = receiver.index[gap_end_idx]

        duration_before, duration_after = get_sampling_durations(
            receiver, gap_start_idx, gap_end_idx, gap_start_time, gap_end_time)

        before = get_normalized_dataframe(receiver, gap_start_time - duration_before, gap_start_time)
        after = get_normalized_dataframe(receiver, gap_end_time, gap_end_time + duration_after)

        donor_start_time = gap_start_time - (duration_before + surrounding_duration)
        donor_end_time = gap_end_time + (duration_after + surrounding_duration)

        scoreboard = []
        for file in donors:
            donor = get_donor(file, donor_start_time, donor_end_time)
            if len(donor.index) != 0:
                try:
                    res = scan_donor(before.copy(), after.copy(), file, donor)
                    scoreboard.append(res)
                except:
                    pass
        if len(scoreboard) != 0:
            scoreboard.sort(key=lambda it: it["score"])
            impute_gap(scoreboard[0], receiver, gap)
        gaps_done += 1
    receiver.interpolate(method="time", limit_direction="both", inplace=True, downcast='infer')


def scan_donor(before: pd.DataFrame, after: pd.DataFrame, donor_filename: str, donor: pd.DataFrame) -> [dict]:
    global column_name, imputing_classification

    keys = np.concatenate([before[column_name].values, after[column_name].values])

    gap_size = after.index[0] - before.index[-1]
    before_size = before.index[-1] - before.index[0]
    after_size = after.index[-1] - after.index[0]

    donor_before = get_normalized_dataframe(donor, donor.index[0], donor.index[-1] - (after_size + gap_size))
    donor_after = get_normalized_dataframe(donor, donor.index[0] + (gap_size + before_size), donor.index[-1])

    sliding_before = sliding_window_view(donor_before[column_name].values, window_shape=len(before.index))
    sliding_after = sliding_window_view(donor_after[column_name].values, window_shape=len(after.index))

    length = min(len(sliding_before), len(sliding_after))
    matrix = np.concatenate([sliding_before[:length], sliding_after[:length]], axis=1)

    if not imputing_classification:
        y_offsets = keys.mean() - matrix.mean(axis=1)
        matrix = np.array([matrix[i] + y_offsets[i] for i in range(len(y_offsets))])
    else:
        y_offsets = np.zeros(len(matrix))
        matrix = matrix.astype(np.float64)

    matrix -= keys
    matrix = np.absolute(matrix)

    comp = matrix.sum(axis=1)
    best = np.argsort(comp)[0]

    return {
        "score": comp[best],
        "x_offset": before.index[0] - donor.index[best],
        "y_offset": y_offsets[best],
        "donor": donor
    }


def impute_gap(best: dict, receiver: pd.DataFrame, gap_indices: [datetime]) -> None:
    global column_name, imputing_classification
    donor = best["donor"].copy()
    donor.index += best["x_offset"]
    donor[column_name] += best["y_offset"]
    if not imputing_classification:
        donor = donor[
            (donor[column_name] > donor[column_name].quantile(0.05)) &
            (donor[column_name] < donor[column_name].quantile(0.95))
        ]
    for gap_idx in gap_indices:
        if gap_idx not in donor.index:
            donor.loc[gap_idx] = np.nan
    donor.sort_index(key=lambda t: t.astype(np.int64), inplace=True)
    donor.interpolate(method="time", limit_direction="both", inplace=True, downcast='infer')
    for gap_idx in gap_indices:
        receiver[column_name][gap_idx] = donor[column_name][gap_idx]


def hotdeck_prehook(next_step):
    global file_select, dfloader, column_name, hd_donors_selector, hd_confirm_button

    column_name = dfloader.targets[0]

    if cached_donors is not None:
        next_step()
        return

    files = filter_compatible_files(file_select.options)
    hd_donors_selector = widgets.SelectMultiple(
        options=files,
        description="Select donors: ",
        rows=len(files)
    )
    hd_confirm_button = widgets.Button(description="Confirm selection")
    display(hd_donors_selector)
    display(hd_confirm_button)

    def callback(e):
        global donors, hd_confirm_button, hd_donors_selector
        hd_confirm_button.close()
        hd_donors_selector.disabled = True
        donors = hd_donors_selector.value
        next_step()
    hd_confirm_button.on_click(callback)


def filter_compatible_files(files: [str]) -> [str]:
    global dfloader
    sheet_name = dfloader.targets_index[0][0]
    return [file for file in files if ".csv" in file or sheet_name in pd.ExcelFile(config["upload_dir"] + file, engine='openpyxl').sheet_names]


def get_normalized_dataframe(df: pd.DataFrame, start_time: datetime, end_time: datetime) -> pd.DataFrame:
    start_idx = df.index.get_loc(start_time if start_time >= df.index[0] else df.index[0], 'pad')
    end_idx = df.index.get_loc(end_time, 'bfill')
    cp = df[(df.index >= df.index[start_idx]) & (df.index <= df.index[end_idx])].copy()

    start_time_missing = cp.index[0] != start_time
    end_time_missing = cp.index[-1] != end_time

    if start_time_missing or end_time_missing:
        if start_time_missing:
            cp.loc[start_time] = np.nan
        if end_time_missing:
            cp.loc[end_time] = np.nan
        cp.sort_index(key=lambda t: t.astype(np.int64), inplace=True)
        cp.interpolate(method="time", limit_direction="both", inplace=True, downcast='infer')
    return cp[(cp.index >= start_time) & (cp.index <= end_time)]


def get_donor(filepath: str, start_time: datetime = None, end_time: datetime = None) -> None:
    global cached_donors
    donor = cached_donors[filepath].copy()
    if start_time != None:
        donor = donor[donor.index >= start_time]
    if end_time != None:
        donor = donor[donor.index <= end_time]
    return donor


def load_donors_in_cache(donors: [str], custom_progress_status: widgets.HTML):
    global dfloader, cached_donors, dataset_config
    custom_progress_status.value = f"Hotdeck starting, loading {len(donors)} donors..."
    cached_donors = dict()
    sheet_name, column_name = dfloader.targets_index[0]
    for filename in donors:
        loader = DataFrameLoader.from_file(config["upload_dir"] + filename, date_parser=dataset_config["date_parser"])
        loader.add_targets(column_name, sheet_name=sheet_name)
        cached_donors[filename] = loader.df.dropna()


def get_gap_boundaries(df: pd.DataFrame, gap_start_time: datetime, gap_end_time: datetime) -> tuple:
    gap_start_idx = df.index.get_loc(gap_start_time) - 1
    gap_end_idx = df.index.get_loc(gap_end_time) + 1

    if gap_start_idx < 0:
        gap_start_idx = 0
    if gap_end_idx >= len(df):
        gap_end_idx = len(df) - 1

    return gap_start_idx, gap_end_idx


def get_sampling_durations(receiver: pd.DataFrame, gap_start_idx: int, gap_end_idx: int, gap_start_time: datetime, gap_end_time: datetime) -> tuple:
    global column_name
    gap_duration = gap_end_time - gap_start_time
    max_duration = gap_duration if gap_duration > timedelta(minutes=20) else timedelta(minutes=20)
    index_count = len(receiver.index)

    duration_before = timedelta(seconds=0)
    while gap_start_idx > 0 \
            and duration_before < max_duration \
            and np.isnan(receiver[column_name][receiver.index[gap_start_idx]]) == False:
        duration_before = gap_start_time - receiver.index[gap_start_idx]
        gap_start_idx -= 1

    duration_after = timedelta(seconds=0)
    while gap_end_idx < index_count \
            and duration_after < max_duration \
            and np.isnan(receiver[column_name][receiver.index[gap_end_idx]]) == False:
        duration_after = receiver.index[gap_end_idx] - gap_end_time
        gap_end_idx += 1

    return duration_before, duration_after


def run_ui(gap_indices_count: int, custom_progress_status: widgets.HTML):
    global gaps_done
    start_time = datetime.now()
    progress_bar = widgets.IntProgress(min=0, max=gap_indices_count, value=0)
    display(progress_bar)
    while gaps_done != progress_bar.max:
        if not running:
            return
        display_progress(start_time, custom_progress_status, progress_bar)
        time.sleep(0.5)
    display_progress(start_time, custom_progress_status, progress_bar)
    custom_progress_status.value = f"Time taken: {(datetime.now() - start_time)}"


def display_progress(start_time: datetime, custom_progress_status: widgets.HTML, progress_bar) -> None:
    global gaps_done
    progress_bar.description = "%d/%d: " % (gaps_done, progress_bar.max)
    progress_bar.value = gaps_done
    elapsed = datetime.now() - start_time
    total_time_estimate = (elapsed / (gaps_done + 1)) * progress_bar.max
    eta = total_time_estimate - elapsed
    eta -= timedelta(microseconds=eta.microseconds)
    elapsed -= timedelta(microseconds=elapsed.microseconds)
    custom_progress_status.value = f"ETA: {eta} ({elapsed} elapsed)"
