In [None]:
from dsrobo.config import Config, JobConfig

In [None]:
from dsrobo.launcher.desmume import DeSmuMELauncher

In [None]:
from dsrobo.rom_loader.desmume import DeSmuMERomLoader

In [None]:
from dsrobo.job_actions.pipeline import JobActionPipelineBase

In [None]:
import dsrobo.utils

In [None]:
import atomacos

In [None]:
import time

In [None]:
config = Config(rom_directory="/Users/archiba/Documents/archiba/")

In [None]:
job_config = JobConfig.parse_file("../../../jobs/nds/POKEMON_SS/shiny_starter.json")

In [None]:
launcher = DeSmuMELauncher()

In [None]:
rom_loader = DeSmuMERomLoader()

In [None]:
application = launcher.launch(config)
rom_loader.load_rom(application, config, job_config)

In [None]:
pl = JobActionPipelineBase(config, job_config)

In [None]:
pl.validate()

In [None]:
pl(application)

In [None]:
from collections import OrderedDict
from pathlib import Path
from typing import Optional

import cv2
import numpy


def assert_find(directory: Path, pattern: str):
    found = list(directory.glob(pattern))
    assert len(found) == 1
    return found[0]


def load_image(path: Path, interest_region: Optional[tuple[int, int, int, int]] = None) -> numpy.ndarray:
    img = cv2.imread(str(path), cv2.IMREAD_COLOR)
    if interest_region is not None:
        x, y, w, h = interest_region
        crop = img[y:y + h, x:x + w]
        return crop
    return img


def load_images(job_dir: Path, iteration_job_no: int, image_job_no: int, image_name: str,
                interest_region: Optional[tuple[int, int, int, int]] = None) -> dict[str, numpy.ndarray]:
    iteration_job_dir = assert_find(job_dir, f"{iteration_job_no}_*")
    image_files = {iter_dir.stem: assert_find(iter_dir, f"{image_job_no}_*/{image_name}")
                   for iter_dir in iteration_job_dir.glob("*") if iter_dir.is_dir()}
    return {iter_no: load_image(iter_path, interest_region)
            for iter_no, iter_path in image_files.items()}


def dump_result(dest_root: Path, images: dict[str, numpy.ndarray], scores: OrderedDict) -> None:
    n_imgs = len(images)
    n_digits = len(str(n_imgs))
    dest_root.mkdir(exist_ok=True, parents=True)
    for i, (key, score) in enumerate(scores.items()):
        dest_name = f"{str(i).zfill(n_digits)}-{key}-{score}.png"
        dest_path = dest_root / dest_name
        cv2.imwrite(str(dest_path), images[key])


In [None]:
from pathlib import Path
from typing import Optional, Type
from collections import OrderedDict

import cv2
import numpy


class AnomalyModel:
    def fit(self, images: dict[str, numpy.ndarray]):
        raise NotImplementedError()

    def evaluate(self, image: numpy.ndarray):
        raise NotImplementedError()


class AbsSumAnomalyModel(AnomalyModel):
    def __init__(self):
        self.model = None

    def fit(self, images: dict[str, numpy.ndarray]):
        mean_img = numpy.mean(list(images.values()), axis=0, dtype='float32')
        self.model = mean_img

    def evaluate(self, image: numpy.ndarray):
        assert image.shape == self.model.shape
        abs_diff = numpy.abs(image - self.model)
        abs_diff_sum = abs_diff.sum(axis=0).sum()
        return abs_diff_sum


class EvaluateAnomalyDetector(object):
    def __init__(self, model_cls: Type[AnomalyModel] = AbsSumAnomalyModel):
        self.model = model_cls()

    def sample_img(self,
                   images: dict[str, numpy.ndarray],
                   dump_to: Optional[Path] = None):
        key = list(images.keys())[0]
        if dump_to is not None:
            cv2.imwrite(str(dump_to), images[key])
        return images[key]

    def fit_model(self, images: dict[str, numpy.ndarray]):
        self.model.fit(images)

    def evaluate(self, images: dict[str, numpy.ndarray]):
        results = {key: self.model.evaluate(image) for key, image in images.items()}
        sorted_results = sorted(results.items(), key=lambda v: v[1], reverse=True)
        od = OrderedDict()
        for key, value in sorted_results:
            od[key] = value
        return od

In [None]:
images = load_images(
    Path("artifacts/POKEMON_SS_Shiny_starter-b10032b3-c36a-4556-99d0-a54a4add941b/"),
    0,
    6,
    'pk1.png',
    interest_region=(275, 250, 200, 200)
)

In [None]:
anomaly_detector = EvaluateAnomalyDetector(AbsSumAnomalyModel)

In [None]:
anomaly_detector.fit_model(images)

In [None]:
scores = anomaly_detector.evaluate(images)

In [None]:
dump_result(Path("6_pk1"), images, scores)

In [None]:
images = load_images(
    Path("artifacts/POKEMON_SS_Shiny_starter-b10032b3-c36a-4556-99d0-a54a4add941b/"),
    0,
    8,
    'pk2.png',
    interest_region=(275, 250, 200, 200)
)

In [None]:
anomaly_detector = EvaluateAnomalyDetector(AbsSumAnomalyModel)

In [None]:
anomaly_detector.fit_model(images)

In [None]:
scores = anomaly_detector.evaluate(images)

In [None]:
dump_result(Path("8_pk2"), images, scores)

In [None]:
images = load_images(
    Path("artifacts/POKEMON_SS_Shiny_starter-b10032b3-c36a-4556-99d0-a54a4add941b/"),
    0,
    10,
    'pk3.png',
    interest_region=(275, 250, 200, 200)
)

In [None]:
anomaly_detector = EvaluateAnomalyDetector(AbsSumAnomalyModel)

In [None]:
anomaly_detector.fit_model(images)

In [None]:
scores = anomaly_detector.evaluate(images)

In [None]:
dump_result(Path("10_pk3"), images, scores)