In [1]:

import os
import pickle

from sklearn.model_selection import KFold
from sklearn.metrics import roc_auc_score, accuracy_score
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

from color_corrector.image_features import image2features
import json
import numpy as np
from PIL import Image
from tqdm.notebook import tqdm

In [2]:
model_path = os.path.abspath(os.path.join(".", "..", "resources", "white_classifier.pkl.gz"))
model_path

'/home/padre/rojects/korrekturVonScans/resources/white_classifier.pkl.gz'

In [3]:
data_path = "/home/padre/rojects/korrekturVonScans/datasets/color_corrections"
with open(os.path.join(data_path, "labeled_tasks.json")) as inp:
    data: dict = json.load(inp)

In [4]:


labels = []
img_path = None

X = None
for i, value in enumerate(tqdm(sorted(data.values(), key=lambda v: v["original_image"]))):
    new_image_path = value["original_image"]
    if new_image_path != img_path:
        img_path = new_image_path
        image = Image.open(f"{data_path}/{img_path}")
        img = image2features(image)
    point_x = value["x"]
    point_y = value["y"]
    point = img[point_x, point_y, :]
    if X is None:
        X = np.empty((len(data), point.shape[0]))
    X[i, :] = point
    labels.append(value["labeled"][0])

X = np.array(X)
labels = np.array(labels, dtype=object)

  0%|          | 0/8190 [00:00<?, ?it/s]

In [7]:
model = make_pipeline(StandardScaler(), LogisticRegression())

In [8]:
import gzip

y = labels
k_fold = KFold(n_splits=10)
acc = 0
auc = 0

for index_train, index_test in tqdm(k_fold.split(X, y), total=k_fold.n_splits):
    X_train = X[index_train]
    y_train = y[index_train]

    X_test = X[index_test]
    y_test = y[index_test]

    model.fit(X_train, y_train)

    predictions_proba = model.predict_proba(X_test)
    predictions = model.predict(X_test)
    acc += accuracy_score(y_true=y_test, y_pred=predictions)
    auc += roc_auc_score(y_true=y_test, y_score=predictions_proba[:, 1])

print("acc", acc / k_fold.n_splits)
print("auc", auc / k_fold.n_splits)
# model = LogisticRegression(class_weight={1: 10, 0: 1})
model.fit(X, y)
with gzip.open(model_path, "wb", compresslevel=9) as out:
    pickle.dump(obj=model, file=out)

  0%|          | 0/10 [00:00<?, ?it/s]

acc 0.9719169719169718
auc 0.9794595873905706


In [None]:
"""
acc 0.9720390720390719
auc 0.9900852366367772
"""

In [66]:
from copy import deepcopy
import os
import shutil


dir_path = "/home/padre/rojects/korrekturVonScans/images/"
dir_path_out = "/tmp/korrekturVonScans_images/"
dir_path_out_compare = "/tmp/korrekturVonScans_images/compare/"
if os.path.isdir(dir_path_out):
    shutil.rmtree(dir_path_out)
os.makedirs(dir_path_out)
os.makedirs(dir_path_out_compare)
size_in = 0
size_out = 0

with open("/tmp/korrekturVonScans_images.log", "w") as logfile:
    for image_path in tqdm(sorted(os.listdir(dir_path))):
        abs_path_in = dir_path + image_path
        abs_path_out = dir_path_out + image_path
        abs_path_out_compare = dir_path_out_compare + image_path
        image = Image.open(abs_path_in)
        image_array = np.array(image)
        image_array_features = image2features(image)
        h, w, c = image_array.shape
        _, _, c_f = image_array_features.shape

        image_array_flat = image_array_features.reshape(h * w, c_f)
        predictions = model.predict(image_array_flat)

        predictions = predictions.reshape(h, w)
        # image_array[predictions == 0] = cv2.multiply(image_array[predictions == 0], 2)

        array_copy = deepcopy(image_array)
        if (predictions == 0).mean() > 0.5:
            step = 10
            width = 100
            mean_prev = array_copy[predictions == 0].mean(axis=0)
            mean = None
            for col in range(0, h + step, step):
                for row in range(0, w, step):
                    left = max(0, col - width // 2)
                    right = min(col + width // 2, h)
                    top = max(0, row - width // 2)
                    bottom = min(row + width // 2, w)
                    window = array_copy[left: right, top: bottom, :]
                    window_mask = (predictions[left: right, top: bottom] == 0)
                    if window_mask.mean() > 0.20:
                        mean = window[window_mask].mean(axis=0)
                        mean_prev, mean = mean, (0.25 * mean_prev + 0.75 * mean)
                    if mean is not None:
                        print(f"{col}: {col + step}, {row}: {row + step},", file=logfile)
                        window_small = image_array[col: col + step, row: row + step, :]
                        fixed = (window_small * 255.0 / mean)
                        fixed[fixed > 255] = 255
                        fixed = fixed.astype(np.uint8)
                        window_small[:, :, :] = fixed

            image_array[image_array.max(axis=2) < 75] = 0
        image_out = Image.fromarray(image_array)
        image_out.save(abs_path_out)

        image_compare = Image.fromarray(np.hstack([array_copy, image_array]))
        image_compare.save(abs_path_out_compare)

        size_in += os.stat(abs_path_in).st_size
        size_out += os.stat(abs_path_out).st_size

print(size_in, size_out)

  0%|          | 0/234 [00:00<?, ?it/s]


KeyboardInterrupt



0.0026983987810779474

In [None]:
fixed =
