In [1]:
import ipywidgets as widgets
from IPython.display import display, clear_output
import numpy as np
import json
import os
from pathlib import Path
import random
import traceback
import cv2
import torch

from skopt import Optimizer
from skopt.space import Real, Integer, Space

HISTORY_FILE = Path('./hp_optimization_history_absolute_scores.json')
image_folder = './optimizer_target_images'

input_image_paths = [os.path.join(image_folder, fname) for fname in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, fname))]
if not input_image_paths:
    raise FileNotFoundError(f"No image files found in the folder: {image_folder}")
print(f"Images found for Upscaling: {len(input_image_paths)}")

scale = 1.414213562 # scaling factor per iteration

PARAMETER_SPACE_CONFIG = {
    'a': (1.0, 14.0, 'continuous', 7.99),
    'b': (1.0, 10.0, 'continuous', 9.93),
    'c': (0.1, 3.0, 'continuous', 0.104),
    'd': (0.1, 5.0, 'continuous', 1.0),
    'e': (0.1, 0.8, 'continuous', 0.799),
    'r': (2, 4, 'integer', 3),
    't': (1, 6, 'integer', 1),
    'u': (0.1, 12.0, 'continuous', 5.033),
    'k': (7, 8, 'integer', 7),
    's': (1.1, 3.0, 'continuous', 1.107),
    'v': (0.0, 10.0, 'continuous', 9.622),
    'w': (0, 30, 'integer', 26),
    'z': (0.0, 3.0, 'continuous', 0.308),
}

# --- skopt Suchraum-Definition ---
search_space_dims = []
param_names = []
initial_config_list = []
initial_config_dict = {}
for name, (low, high, ptype, initial) in PARAMETER_SPACE_CONFIG.items():
    if ptype == 'continuous':
        search_space_dims.append(Real(low, high, name=name))
    elif ptype == 'integer':
        search_space_dims.append(Integer(low, high, name=name))
    param_names.append(name)
    initial_config_list.append(initial)
    initial_config_dict[name] = initial

search_space = Space(search_space_dims)

# Optimization settings
MAX_COMPARISONS_TOTAL = 525
MAX_COMPARISONS_PER_SESSION = 10

if torch.cuda.is_available():
    upscaler_device = torch.device("cuda")
    print("Using CUDA for upscaler.")
else:
    upscaler_device = torch.device("cpu")
    print("CUDA not found. Using CPU for upscaler.")

from knn_torch_upscaler import scale_up_pytorch

def upscale_image_cv2(input_path: Path, params: dict) -> np.ndarray:
    img = cv2.imread(str(input_path))
    if img is None: return np.full((60, 30, 3), (0, 0, 255), dtype=np.uint8) # Red error placeholder
    try:
        a,b,c,d,e,r,t,u,k,s,v,w,z = [params[p] for p in param_names]
        k = int(k); r = int(r); t = int(t); w = int(w) 
        if k % 2 == 0: k -= 1
    except KeyError as ke:
        print(f"ERROR: Missing parameter in upscale_image_cv2: {ke}")
        return np.full((60, 30, 3), (0, 255, 0), dtype=np.uint8) # Green error placeholder
    except Exception as ex:
        print(f"ERROR: Parameter conversion failed in upscale_image_cv2: {ex}")
        return np.full((60, 30, 3), (255, 0, 0), dtype=np.uint8) # Blue error placeholder

    current_img = cv2.resize(img,(120,120),interpolation=cv2.INTER_AREA) 
    current_img = torch.tensor(current_img, dtype=torch.float16).permute(2,0,1).to(upscaler_device)
    try:
        for _ in range(6):
            h, w =  current_img.shape[1], current_img.shape[2]
            target_w = int(w * scale + 0.5)
            target_h = int(h * scale + 0.5)
            if target_w <= w or target_h <= h: break
            current_img = scale_up_pytorch(current_img, target_w, target_h, upscaler_device,
                                           a, b, c, d, e, r, t, u, k, s, v, w, z)
        current_img = current_img.permute(1,2,0).cpu().numpy().astype(np.uint8)
    except Exception as upscale_err:
         print(f"ERROR during scale_up_pytorch: {upscale_err}")
         return np.full((60, 30, 3), (255, 255, 0), dtype=np.uint8) # Cyan error placeholder
    return current_img

# --- skopt Bayesian Optimization Logic ---
def params_to_list(params_dict: dict) -> list:
    return [params_dict[name] for name in param_names]

def list_to_params(params_list: list) -> dict:
    return {name: val for name, val in zip(param_names, params_list)}

skopt_optimizer = Optimizer(
    dimensions=search_space,
    base_estimator="gp",
    acq_func="LCB",
    random_state=42
)

# --- Core Logic: Load, UI, Interact, Save ---
comparison_history = [] 
comparison_count = 0
comparisons_in_this_session = 0
current_params_A = None # Dict Format
current_params_B = None # Dict Format

if HISTORY_FILE.exists():
    try:
        with open(HISTORY_FILE, 'r') as f:
            comparison_history_loaded = json.load(f)

        points_to_tell_map = {}

        for entry in comparison_history_loaded:
            if len(entry) == 4:
                pA_dict, pB_dict, sA, sB = entry
                pA_list = params_to_list(pA_dict)
                pB_list = params_to_list(pB_dict)
                pA_tuple = tuple(pA_list)
                pB_tuple = tuple(pB_list)

                if sA is not None:
                    points_to_tell_map[pA_tuple] = -float(sA)
                if sB is not None:
                    points_to_tell_map[pB_tuple] = -float(sB)
            else:
                 print(f"Warning: Ignore invalid history entry: {entry}")

        if points_to_tell_map:
            points_list = [list(p) for p in points_to_tell_map.keys()]
            results_list = list(points_to_tell_map.values())
            skopt_optimizer.tell(points_list, results_list)
            print(f"Loaded and told optimizer about {len(points_list)} unique points from history.")
            comparison_history = comparison_history_loaded
            comparison_count = len(comparison_history)
        else:
            print("History file loaded, but contained no valid scores to tell the optimizer.")

    except Exception as e:
        print(f"WARN: Failed to load or process history: {e}")
        traceback.print_exc(limit=2)
        comparison_history = []


def suggest_pair() -> tuple[dict, dict]:
    global skopt_optimizer, initial_config_dict

    if not skopt_optimizer.Xi:
        print("Suggesting: Initial Config vs. Optimizer suggestion")
        params_A_dict = initial_config_dict
        params_B_list = skopt_optimizer.ask()
        params_B_dict = list_to_params(params_B_list)
        while params_A_dict == params_B_dict:
             params_B_list = skopt_optimizer.ask()
             params_B_dict = list_to_params(params_B_list)
        return params_A_dict, params_B_dict
    else:
        print("Suggesting: Current best vs. Optimizer suggestion")
        best_index = np.argmin(skopt_optimizer.yi)
        params_A_list = skopt_optimizer.Xi[best_index]
        params_A_dict = list_to_params(params_A_list)
        params_B_list = skopt_optimizer.ask()
        params_B_dict = list_to_params(params_B_list)
        attempts = 0
        while params_A_dict == params_B_dict and attempts < 5:
             params_B_list = skopt_optimizer.ask()
             params_B_dict = list_to_params(params_B_list)
             attempts += 1
        if params_A_dict == params_B_dict:
             params_B_list = search_space.rvs(random_state=random.randint(0,1000))[0]
             params_B_dict = list_to_params(params_B_list)
        return params_A_dict, params_B_dict

def get_best() -> dict | str:
    global skopt_optimizer
    if not skopt_optimizer.Xi:
        return "Not enough data."
    print("Find best parameters from optimizer history...")
    try:
        best_index = np.argmin(skopt_optimizer.yi)
        best_params_list = skopt_optimizer.Xi[best_index]
        best_params_dict = list_to_params(best_params_list)
        return best_params_dict
    except Exception as e:
        print(f"ERROR when finding the best parameters: {e}")
        return "ERROR when finding the best parameters"

# --- Utility Functions ---
def cv2_to_bytes(img_cv2: np.ndarray) -> bytes:
    if img_cv2 is None or img_cv2.size == 0: img_cv2 = np.zeros((10, 10, 3), dtype=np.uint8)
    if img_cv2.dtype != np.uint8:
        try:
            if np.issubdtype(img_cv2.dtype, np.floating):
                img_cv2 = (np.clip(img_cv2, 0, 1) * 255).astype(np.uint8)
            elif img_cv2.max() > 255 or img_cv2.min() < 0:
                 img_cv2 = cv2.normalize(img_cv2, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
            else:
                 img_cv2 = img_cv2.astype(np.uint8)
        except Exception as e:
            print(f"Warning: Image dtype conversion failed ({img_cv2.dtype}): {e}. Using fallback.")
            img_cv2 = np.zeros((10, 10, 3), dtype=np.uint8)
    if img_cv2.ndim == 2: pass
    elif img_cv2.ndim == 3 and img_cv2.shape[2] in [1, 3, 4]: pass
    else:
        print(f"Warning: Unexpected image dimensions {img_cv2.shape}. Using fallback.")
        img_cv2 = np.zeros((10, 10, 3), dtype=np.uint8)
    is_success, buffer = cv2.imencode(".png", img_cv2)
    if not is_success: raise RuntimeError("cv2.imencode failed")
    return buffer.tobytes()

def make_serializable(data):
    if isinstance(data, np.ndarray):
        return data.tolist()
    if isinstance(data, (np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8,
                         np.uint16, np.uint32, np.uint64)):
        return int(data)
    elif isinstance(data, (np.float16, np.float32, np.float64)):
        return float(data)
    elif isinstance(data, (np.complex64, np.complex128)):
        return {'real': data.real, 'imag': data.imag}
    elif isinstance(data, (np.bool_)):
        return bool(data)
    elif isinstance(data, (np.void)):
        return None
    elif isinstance(data, dict):
        return {k: make_serializable(v) for k, v in data.items()}
    elif isinstance(data, (list, tuple)):
        return type(data)(make_serializable(i) for i in data)
    return data

# --- UI Setup ---
image_A_widget = widgets.Image(format='png', width=1000, height=1000)
image_B_widget = widgets.Image(format='png', width=1000, height=1000)

# Score Inputs
score_A_input = widgets.IntSlider(min=1, max=10, step=1, description='Score A (1=bad, 10=good):', value=5, layout=widgets.Layout(width='auto'), style = {'description_width': 'initial'})
score_B_input = widgets.IntSlider(min=1, max=10, step=1, description='Score B (1=bad, 10=good):', value=5, layout=widgets.Layout(width='auto'), style = {'description_width': 'initial'})

# Buttons
button_submit = widgets.Button(description="Send ratings", button_style='success', icon='check')
button_skip = widgets.Button(description="Skip", button_style='info', icon='forward')

progress_text = widgets.HTML(value="")
output_area = widgets.Output(layout={'padding': '5px'})

ui = widgets.VBox([
    progress_text,
    widgets.HBox([
        widgets.VBox([widgets.Label("Image A"), image_A_widget, score_A_input]),
        widgets.VBox([widgets.Label("Image B"), image_B_widget, score_B_input])
    ]),
    widgets.HBox([button_submit, button_skip]),
    output_area
])

def display_next_comparison():
    global current_params_A, current_params_B, comparison_count, comparisons_in_this_session, input_image_paths

    if not input_image_paths:
        with output_area: clear_output(wait=True); print("ERROR: No input images found!")
        return

    if comparison_count >= MAX_COMPARISONS_TOTAL or comparisons_in_this_session >= MAX_COMPARISONS_PER_SESSION:
         reason = "Maximum number of comparisons achieved." if comparison_count >= MAX_COMPARISONS_TOTAL else "Limit reached for this session."
         show_final_result(reason)
         return

    progress_text.value = f"<b>Comparison {comparison_count + 1}/{MAX_COMPARISONS_TOTAL} (Session: {comparisons_in_this_session + 1}/{MAX_COMPARISONS_PER_SESSION})</b>"
    button_submit.disabled = True; button_skip.disabled = True
    image_A_widget.value = b''; image_B_widget.value = b''
    score_A_input.value = 5; score_B_input.value = 5 # Reset scores

    try:
        selected_image_path_str = random.choice(input_image_paths)
        selected_image_path = Path(selected_image_path_str)
        img_name_log = f" (Bild: {selected_image_path.name})"

        with output_area: clear_output(wait=True); print(f"Generate pair {comparison_count+1}{img_name_log}...")

        params_A, params_B = suggest_pair()
        current_params_A, current_params_B = params_A, params_B

        # Display parameters for both images
        with output_area:
            print(f"Parameters for image A: { {round(v, 3) if isinstance(v, float) else v for k, v in params_A.items()} }")
            print(f"Parameters for image B: { {round(v, 3) if isinstance(v, float) else v for k, v in params_B.items()} }")

        img_A = upscale_image_cv2(selected_image_path, params_A)
        if img_A is None: raise RuntimeError("Upscaler A returned None")
        image_A_widget.value = cv2_to_bytes(img_A)
        print("Image A generated.")

        img_B = upscale_image_cv2(selected_image_path, params_B)
        if img_B is None: raise RuntimeError("Upscaler B returned None")
        image_B_widget.value = cv2_to_bytes(img_B)
        print("Image B generated. Please rate and send.")

        button_submit.disabled = False; button_skip.disabled = False

    except Exception as e:
         with output_area: clear_output(wait=True); print(f"ERROR when generating the comparison: {e}")
         traceback.print_exc(limit=2)
         progress_text.value += " <b style='color:red;'>ERROR!</b>"

def process_evaluation(score_a=None, score_b=None):
    global comparison_count, comparison_history, comparisons_in_this_session, skopt_optimizer
    global current_params_A, current_params_B

    button_submit.disabled = True; button_skip.disabled = True

    valid_scores = False
    if score_a is not None and score_b is not None:
        try:
            sA = int(score_a)
            sB = int(score_b)
            if 1 <= sA <= 10 and 1 <= sB <= 10:
                valid_scores = True
                print(f"Receive scores: A={sA}, B={sB}")
            else:
                print("Error: Scores must be between 1 and 10.")
                button_submit.disabled = False
                button_skip.disabled = False
                return
        except ValueError:
            print("Error: Scores must be whole numbers.")
            button_submit.disabled = False
            button_skip.disabled = False
            return

    history_entry = (
        make_serializable(current_params_A),
        make_serializable(current_params_B),
        sA if valid_scores else None,
        sB if valid_scores else None
    )
    comparison_history.append(history_entry)
    comparison_count = len(comparison_history)
    comparisons_in_this_session += 1

    try:
        with open(HISTORY_FILE, 'w') as f:
            json.dump(comparison_history, f, indent=2)
    except Exception as e:
        print(f"WARN: Failed to save the history: {e}")

    if valid_scores:
        try:
            pA_list = params_to_list(current_params_A)
            pB_list = params_to_list(current_params_B)
            neg_score_A = -float(sA)
            neg_score_B = -float(sB)

            # Tell optimizer about both points and their scores
            points_to_tell = [pA_list, pB_list]
            scores_to_tell = [neg_score_A, neg_score_B]

            skopt_optimizer.tell(points_to_tell, scores_to_tell)
            print(f"Optimizer informed about 2 new points (Scores: {neg_score_A}, {neg_score_B}).")

        except Exception as e:
             print(f"ERROR when updating the skopt Optimizer: {e}")
             traceback.print_exc(limit=2)
    else:
        print("Pair skipped, optimizer not updated.")

    display_next_comparison()

def on_submit_clicked(b):
    process_evaluation(score_A_input.value, score_B_input.value)

def on_skip_clicked(b):
    process_evaluation(score_a=None, score_b=None)

button_submit.on_click(on_submit_clicked)
button_skip.on_click(on_skip_clicked)

def show_final_result(reason: str):
    button_submit.disabled = True; button_skip.disabled = True
    score_A_input.disabled = True; score_B_input.disabled = True
    with output_area:
        clear_output(wait=True)
        print(f"--- Optimization Finished ---")
        print(f"Reason: {reason} ({comparison_count} Total comparisons)")
        print("Estimate best parameters...")
        best_params = get_best()
        if isinstance(best_params, dict):
            print("\nBest parameters found:")
            best_params_rounded = {k: round(v, 4) if isinstance(v, float) else v for k,v in best_params.items()}
            print(json.dumps(make_serializable(best_params_rounded), indent=2))

            try:
                 best_params_list = params_to_list(best_params)
                 
                 predicted_neg_score_mean, predicted_neg_score_std = skopt_optimizer.models[-1].predict(np.array([best_params_list]), return_std=True)
                 predicted_user_score = -predicted_neg_score_mean[0]
                 print(f"\nEstimated best score (1-10): {predicted_user_score:.2f} (std: {predicted_neg_score_std[0]:.2f})")
            except Exception as pred_e:
                 print(f"\nCould not estimate score for best parameter: {pred_e}")

            if input_image_paths:
                try:
                     final_img_path = Path(random.choice(input_image_paths))
                     print(f"\nGenerate final sample image ({final_img_path.name})...")
                     final_img = upscale_image_cv2(final_img_path, best_params)
                     if final_img is not None:
                         final_widget = widgets.Image(value=cv2_to_bytes(final_img), format='png', width=400)
                         display(final_widget)
                         print("Final image showed.")
                     else:
                         print("ERROR: Upscaler returned None for final picture.")
                except Exception as e:
                    print(f"ERROR when generating the final image: {e}")
                    traceback.print_exc(limit=1)
            else:
                print("\nNo images found to generate the final example.")
        else:
            print(f"\nCould not find best parameters: {best_params}")

# --- Start ---
print("Start interactive optimization with skopt...")
if not input_image_paths:
     print("ERROR: No input images found in the folder. Please make sure that images are in ‘./optimizer_target_images’.")
else:
     display(ui)
     display_next_comparison()

Images found for Upscaling: 7
Using CUDA for upscaler.
Loaded and told optimizer about 515 unique points from history.
Start interactive optimization with skopt...


VBox(children=(HTML(value=''), HBox(children=(VBox(children=(Label(value='Image A'), Image(value=b'', height='…

funny configurations:  
{0.152, 1.906, 2.363, 3.138, 4.752, np.int64(4), np.int64(1), 2.714, np.int64(8), 5.973, 0.19, 11.254, np.int64(21)}
{0.781, 1.632, 2.702, np.int64(2), np.int64(4), 2.917, 0.252, np.int64(7), 8.071, 8.347, np.int64(9), 11.379, 2.412}
{0.566, 0.721, 1.144, 0.071, 4.643, np.int64(4), np.int64(5), 7.101, np.int64(8), 9.23, 10.993, 1.416, np.int64(22)}
{0.327, 1.801, 0.739, np.int64(1), 4.921, np.int64(4), 6.307, np.int64(7), 1.356, 9.86, 0.123, 0.184, np.int64(23)}
{0.795, 1.81, 2.569, np.int64(1), np.int64(4), 5.506, 1.505, np.int64(7), np.int64(6), 9.167, 10.263, 13.861, 0.189}
{0.79, np.int64(1), 2.309, 2.303, 4.756, np.int64(4), 6.636, 7.772, np.int64(8), 1.259, 3.68, 0.303, np.int64(25)}
{0.109, 1.427, 0.798, np.int64(3), np.int64(1), 1.61, 6.162, 6.247, np.int64(7), 1.808, 8.467, np.int64(26), 0.205}
{0.143, np.int64(1), 2.334, 3.213, 4.854, np.int64(4), 6.864, 2.49, np.int64(8), 0.25, np.int64(9), 11.644, 1.432}
{0.391, 1.688, 0.795, 3.132, np.int64(4), 1.523, 6.198, 6.391, np.int64(6), np.int64(8), 0.774, np.int64(18), 0.205}
{0.723, np.int64(1), 2.335, 2.57, np.int64(4), 3.889, 1.727, 6.039, np.int64(8), 9.97, 9.873, 0.289, np.int64(26)}
{0.744, 1.775, 2.937, 3.727, np.int64(3), 1.978, np.int64(6), 7.805, 8.729, np.int64(8), 2.903, 0.014, np.int64(23)}
{0.531, 0.767, 2.871, 3.376, np.int64(4), np.int64(2), 1.429, 1.191, np.int64(8), 5.934, np.int64(10), 11.242, 0.229}
{0.731, 1.747, 0.755, 3.547, 4.951, np.int64(4), np.int64(1), 5.804, np.int64(8), 2.754, 0.234, np.int64(20), 0.515}
{0.764, 1.818, 2.015, 3.492, np.int64(4), np.int64(2), 6.93, np.int64(7), 8.053, 0.096, 10.501, 2.998, np.int64(26)}
{0.657, 1.773, 0.449, np.int64(1), np.int64(4), 1.721, 6.825, np.int64(7), 8.925, 5.458, 11.703, np.int64(16), 0.196}
{0.552, 0.776, 1.903, 2.515, np.int64(4), 5.212, np.int64(6), 4.475, 8.361, 8.465, np.int64(8), 0.225, np.int64(30)}
{0.1, 1.5, 1.618, 0.8, np.int64(4), np.int64(1), 2.377, 1.1, np.int64(8), 9.686, 7.517, 0.273, np.int64(30)}
{0.1, 1.534, 0.8, np.int64(1), np.int64(4), 1.219, 6.27, 1.617, 8.235, np.int64(8), 7.239, 0.276, np.int64(30)}