# Imports

In [None]:
import glob
import json
import os
import random
import subprocess
import time

from itertools import product
from typing import List, Tuple, Dict

from benchmark_utils import *

# Function to generate output

In [None]:
def get_ground_truth_path(input: str) -> str:
    """
    Get the ground truth path for a given input file
    :param input: input file path
    :return: ground truth file path or None if not found
    """
    gt = input.replace("input", "ground truth")
    if os.path.exists(gt):
        return gt
    gt = input.replace("input", "ground truth").replace(".png", ".csv")
    if os.path.exists(gt):
        return gt
    return None


def pre_run(inputs: List[str], labels: List[str], has_gt: bool = True) -> List[Tuple[str, str]]:
    """
    Pre-run to get the list of files to run with the program to run
    :param inputs: list of input files
    :param labels: list of labels
    :param has_gt: if True, only keep the files which ground truth is big enough
    :return: list of (program, input file) to run
    """
    def keep_input(x: str):
        if has_gt:
            r = get_ground_truth_path(x)
            if not r:
                return False
        return True

    inputs_filtered = list(filter(lambda x: keep_input(x), inputs))
    print("Unselected for no ground truth: ",
          len(inputs) - len(inputs_filtered))

    to_run = list(product(labels, inputs_filtered))
    random.shuffle(to_run)
    to_run.append(to_run[0])  # First launch is always slow

    return to_run


def get_time_json(time_json_info_save_path: str) -> dict:
    """
    Get the time info json
    :param time_json_info_save_path: path to the json file
    :return: the json
    """
    if os.path.exists(time_json_info_save_path):
        with open(time_json_info_save_path, "r") as f:
            time_info_json = json.load(f)
    else:
        time_info_json = {}
    return time_info_json


def get_file_out(file_in: str, method: str) -> str:
    """
    Get the output file path (and create the folder if needed)
    :param file_in: input file path
    :param method: method name
    :return: output file path
    """
    file_out = file_in.replace("input", "output")
    path_file_out = os.path.dirname(file_out)
    if not os.path.exists(path_file_out):
        os.mkdir(path_file_out)
    path_file_out_method = os.path.join(path_file_out, method)
    if not os.path.exists(path_file_out_method):
        os.mkdir(path_file_out_method)
    input_basename_without_ext = os.path.splitext(
        os.path.basename(file_out))[0]
    file_out = os.path.join(path_file_out_method, input_basename_without_ext)
    return file_out


def run(to_run: List[Tuple[str, str]], cmd_builder, time_json_info_save_path: str) -> None:
    """
    Compute the outputs for a list of input files
    :param to_run: list of (method, input file) to run
    :param cmd_builder: function that build the command line
    :param time_json_info_save_path: path to the json file
    """
    time_info_json = get_time_json(time_json_info_save_path)
    for program, file_in in to_run:
        file_out = get_file_out(file_in, program)
        cmd = cmd_builder(program, file_in, file_out)

        t0 = time.time()
        s = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        t1 = time.time()

        if program not in time_info_json:
            time_info_json[program] = {}
        file_in_no_ext = os.path.splitext(os.path.basename(file_in))[0]
        time_info_json[program][file_in_no_ext] = t1 - t0

    with open(time_json_info_save_path, "w") as f:
        json.dump(time_info_json, f, indent=2)


def get_inputs(dataset: str, t: str, item) -> List[str]:
    """
    Get the input files for a dataset
    :param dataset: trade_directories | music_sheets | maps
    :param t: train | test
    :param item: -1 for all, or a list of index
    :return: list of input files
    """
    glob_path = os.path.join(dataset_folder, dataset, t, "input/**")
    inputs = list(glob.glob(glob_path))
    if type(item) == list:
        inputs = [inputs[f] for f in item]
    return inputs


def compute_outputs(dataset: str, labels: List[str], cmd_builder, item=-1, to_test=-1, ds_type="full") -> None:
    """
    Compute the outputs for a dataset for a list of labels
    :param dataset: trade_directories | music_sheets | maps
    :param labels: list of labels
    :param cmd_builder: function that build the command line
    :param item: -1 for all, or a list of index
    :param to_test: -1 for all, or a list of index
    :param ds_type: train | test | full
    """
    if not ds_type in ["train", "test", "full"]:
        raise Exception("Bad type : train | test | full")

    if not dataset in ["trade_directories", "music_sheets", "maps"]:
        raise Exception(
            "Bad dataset : trade_directories | music_sheets | maps")

    if to_test != -1:
        labels = [labels[to_test]]

    ds_type = ["train", "test"] if ds_type == "full" else [ds_type]
    has_gt = dataset != "maps"

    for t in ds_type:
        inputs = get_inputs(dataset, t, item)

        to_run = pre_run(inputs, labels, has_gt)

        time_json_info_save = os.path.join(
            dataset_folder, dataset, t, "time.json")
        run_info = run(to_run, cmd_builder, time_json_info_save)

# Comparisons

## Pylene predictors

In [None]:
predictor_labels_id: Dict[str, int] = {
    'Last observation': 3,
    'SMA': 4,
    'EMA': 5,
    'Double exponential': 2,
    'Kalman': 0,
    'One euro': 1,
}

### Annuaries (vector + time)

In [None]:
predictors_trade_directories_args: List[str] = [
    '--blumi=150', '--llumi=150', '--discontinuity_relative=1', '--minLen=300'
]


def predictors_trade_directories_cmd_builder(p: str, file_input: Path, output_filename: Path) -> List[str]:
    return [pylene_bin] + predictors_trade_directories_args + [f"--input={file_input}", f"--vector_output={output_filename}.csv"] + [f"--tracker={predictor_labels_id[p]}"]

In [None]:
compute_outputs(
    "trade_directories",
    predictor_labels,
    predictors_trade_directories_cmd_builder,
    ds_type="train")

### Music Sheet (pixel + time)

In [None]:
predictor_music_sheets_args: List[str] = [
    "--max_thickness=6",
    "--traversal_mode=0",
    "--discontinuity_relative=5",
    "--discontinuity_absolute=10",
    "--sigma_thickness_min_adv=3",
    "--minLen=1000",

    "--type_out=3"
]


def predictor_music_sheets_cmd_builder(p: str, file_input: Path,  file_output: Path) -> List[str]:
    return [pylene_bin] + predictor_music_sheets_args + [f"--input={file_input}", f"--pixel_output={file_output}.png"] + [f"--tracker={predictor_labels_id[p]}"]

In [None]:
compute_outputs(
    "music_sheets",
    predictor_labels,
    predictor_music_sheets_cmd_builder,
    ds_type="full")

### Maps (time)

In [None]:
predictor_map_args: List[str] = [
    "--blumi=180", "--llumi=180", "--discontinuity_relative=4", "--minLen=20", "--type_out=3", "--max_thickness=6"
]


def predictor_map_cmd_builder(p: str, file_input: Path,  file_output: Path) -> List[str]:
    return [pylene_bin] + predictor_map_args + [f"--input={file_input}", f"--output={file_output}.png"] + [f"--tracker={predictor_labels_id[p]}"]

In [None]:
compute_outputs(
    "maps",
    predictor_labels,
    predictor_map_cmd_builder,
    ds_type="full")

## Pylene state of the art 

In [None]:
sta_labels: List[str] = [
    "pylene",
    "edlines",
    "ocv_hough",
    "cannylines",
    "lsd",
    "lsd_m",
    "elsed",
    "ag3line"
]

### Annuaries (vector + time)

In [None]:
sta_trade_directories_args: Dict[str, List[str]] = {
    "pylene":  ['--llumi=150', '--blumi=150', '--discontinuity_relative=1', '--minLen=300', '--tracker=0'],
    "edlines":  ['--maxDistanceGap=10', '--minLen=300'],
    "ocv_hough":  ['--maxGap=10', '--minLen=300', "--binthresh=150", "--threshold=60"],
    "cannylines":  ['--minLen=300'],
    "lsd":  ['--scale=0.99', '--sigma_coef=4.5'],
    "lsd_m":  ['--minLen=300', '--scale=0.99', '--sigma_coef=4.5'],
    "elsed":  ['--minLen=300', '--scale=0.99', '--sigma_coef=4.5'],
    "ag3line":  ['--minLen=300', '--maxGap=10'],
}


def sta_trade_directories_cmd_builder(program: str, file_input: Path,  file_output: Path) -> List[str]:
    return [BIN_FOLDER + "/lsd_" + program] + sta_trade_directories_args[program] + [f"--input={file_input}", f"--output={file_output}.csv"]

In [None]:
compute_outputs(
    "trade_directories",
    sta_labels,
    sta_trade_directories_cmd_builder,
    ds_type="full")