In [None]:
import os
import random
import matplotlib.pyplot as plt
from PIL import Image
from skimage import io, filters
import numpy as np

from tqdm.notebook import tqdm

from PIL import Image
import numpy as np

import cv2
import copy
import time

from mpire import WorkerPool

from PIL import Image
from matplotlib import cm
from tqdm.notebook import tqdm


from combra import stats as cstats
from combra import approx as capprox
from combra import image as cimage

from numba import njit
import numpy as np
import cv2
import os

from combra.tests import test_fractal_dimensions
from combra.contours import contour_to_binary_mask, scale_contour, draw_contours

import json
from pathlib import Path



In [None]:
def preprocess_image(image_path):
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    img = img[400:-400,1300:-1300]
    _, binary = cv2.threshold(img, 0, 255, cv2.THRESH_OTSU)

    cnts, _ = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    return binary, cnts


# --- Group images by number ranges (1-500, 501-1000, ...) ---
from pathlib import Path
import re


def _extract_image_number(p):
    stem = Path(p).stem
    m = re.search(r"(\d+)", stem)
    return int(m.group(1)) if m else None


images_dir = Path("data/autumn/images")
all_image_paths = [p for p in images_dir.glob("*.JPG") if re.fullmatch(r"\d+", p.stem)]
all_image_paths = sorted(
    all_image_paths,
    key=lambda p: _extract_image_number(p) or 0,
)

# If you also have .jpg/.jpeg, uncomment:
# all_image_paths += list(images_dir.glob("*.jpg")) + list(images_dir.glob("*.jpeg"))

group_size = 500
_groups = {}

for p in all_image_paths:
    n = _extract_image_number(p)
    if n is None:
        continue
    g_start = ((n - 1) // group_size) * group_size + 1
    g_end = g_start + group_size - 1
    label = f"{g_start:04d}-{g_end:04d}"
    _groups.setdefault(label, []).append(str(p))

# Sorted list of (label, [image_path, ...])
image_groups = sorted(
    _groups.items(),
    key=lambda kv: int(kv[0].split("-")[0]),
)

group_labels = [label for label, _ in image_groups]

print(f"Found {len(all_image_paths)} images, grouped into {len(image_groups)} groups")
if image_groups:
    print("First 5 groups:", [(k, len(v)) for k, v in image_groups[:5]])



# Fractal dimention

In [None]:
sizes = 2 ** np.arange(1, 10)
test_fractal_dimensions(sizes)

In [None]:
types=['fractal_dimension']
type=types[0]

n_jobs=20



a_list = []

n_images_list = []
n_contours_list = []
fd_mean_list = []
fd_median_list = []
fd_lists = []

def _get_contours(image_path):
    # return only contours (avoid transferring the binary image)
    _, contours = preprocess_image(image_path)
    return contours

for idx, (group_label, group_paths) in tqdm(enumerate(image_groups)):

    # Collect ALL contours from ALL images in this group (in parallel)
    with WorkerPool(n_jobs=n_jobs, use_dill=False) as pool:
        contours_per_image = pool.map(
            _get_contours,
            group_paths,
            progress_bar=True,
            chunk_size=1,
        )

        all_contours = [c for contours in contours_per_image for c in contours]

        # Process in parallel - NO shared_objects (as in your 25-39 snippet)
        results = pool.map(
            cimage.contour_fractal_dimension,
            all_contours,
            progress_bar=True,
            chunk_size=64,
        )

    fd_list = [d for d in results if d is not None]

    # Keep some diagnostics
    n_images_list.append(len(group_paths))
    n_contours_list.append(len(all_contours))
    fd_mean_list.append(float(np.mean(fd_list)) if fd_list else np.nan)
    fd_median_list.append(float(np.median(fd_list)) if fd_list else np.nan)
    fd_lists.append([float(x) for x in fd_list])

# Save results to JSON (per-group)
out_path = Path("fractals_results_full.json")
results_json = {
    "type": type,
    "n_jobs": n_jobs,
    "groups": [
        {
            "group_label": gl,
            "n_images": int(ni),
            "n_contours": int(nc),
            "fd_mean": None if (isinstance(fm, (float, np.floating)) and np.isnan(fm)) else float(fm),
            "fd_median": None if (isinstance(fmd, (float, np.floating)) and np.isnan(fmd)) else float(fmd),
            "fd_list": fdl,
        }
        for gl, ni, nc, fm, fmd, fdl in zip(
            group_labels, n_images_list, n_contours_list, fd_mean_list, fd_median_list, fd_lists
        )
    ],
}

with open(out_path, "w", encoding="utf-8") as f:
    json.dump(results_json, f, ensure_ascii=False, indent=2)




In [None]:
%%time

# Use JSON generated by the fractal-dimension extraction cell
in_path = Path("fractals_results_full.json")
with open(in_path, "r", encoding="utf-8") as f:
    results_json = json.load(f)

In [None]:
%%time

groups = results_json["groups"]
group_labels = [g["group_label"] for g in groups]

# bin size for stats_preprocess
for step in tqdm([0.01, 0.05, 0.1, 0.2, 0.3]):

    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Colors per group (will cycle if many groups)
    colors = plt.cm.tab20(np.linspace(0, 1, max(2, min(20, len(groups)))))

    a_list = []

    for idx, g in enumerate(groups):
        fd_list = g.get("fd_list", []) or []
        color = colors[idx % len(colors)]

        if not fd_list:
            a_list.append(np.nan)
            continue

        x_orig, y_orig = cstats.stats_preprocess(fd_list, step)

        # Exponential distribution only
        (x_fit, y_fit), a, amp_exp = capprox.exponential_approx(
            x_orig, y_orig, a=1, amp=1, x_lim=[1, 2], N=20
        )
        a_list.append(a)

        label = (
            # f"{g['group_label']} | imgs={g.get('n_images', 0)} | cnts={g.get('n_contours', 0)} | a={a:.4f}"
            f"{g['group_label']} | a={a:.4f}"
        )

        axes[0].plot(x_fit, y_fit, '--', linewidth=2, color=color)
        axes[0].plot(x_orig, y_orig, '-o', color=color, label=label)

    axes[0].set_title(f'exponential fit, step={step}', fontsize=15)
    axes[0].set_xlim(1, 2)
    axes[0].set_ylim(1e-8, 1)
    axes[0].set_yscale('log')
    axes[0].set_ylabel('p(x)', fontsize=15)
    axes[0].legend(loc='upper right', fontsize=10)

    # Diagnostics: parameter a per group
    axes[1].plot(range(len(groups)), a_list, '-o', label='a')
    axes[1].set_title(f'exponential parameter a, step={step}', fontsize=15)
    axes[1].set_xticks(range(len(groups)))
    axes[1].set_xticklabels(group_labels, rotation=45, fontsize=12, ha='right')
    axes[1].legend(loc='upper right', fontsize=12)

    plt.tight_layout()
    plt.show()


In [None]:
y_orig