# Задание по курсу "Практическое введение в анализ изображений"

Задание посвящено разработке метода автоматической сегментации минералов на микроскопических изображениях геологических аншлифов. Задание не имеет никаких ограничений по используемым подходам и методам. Тем не менее большинство вещей, расказанных в курсе "Практическое введение в анализ изображений", так или иначе связаны с тематикой задания и могут быть полезны.

В данной реализации используется пакет [`petroscope`](https://github.com/xubiker/petroscope), содержащий набор методов, упрощающих работу с геологическими изображениями. 

В качестве данных для обучения и тестирование в данном задании используется набор изображений [LumenStone](https://imaging.cs.msu.ru/en/research/geology/lumenstone) (подмножество S1v1.5).



Используется реализация классических суперпиксельных алгоритмов из пакета `scikit-image`

Для начала установим пакет `petroscope`:

In [205]:
!pip install petroscope --force-reinstall

Collecting petroscope
  Using cached petroscope-0.0.11-py3-none-any.whl.metadata (7.5 kB)
Collecting pyyaml (from petroscope)
  Using cached PyYAML-6.0.2-cp313-cp313-macosx_10_13_x86_64.whl.metadata (2.1 kB)
Collecting numpy<2.0.0,>=1.16 (from petroscope)
  Using cached numpy-1.26.4-cp313-cp313-macosx_11_0_x86_64.whl
Collecting pillow (from petroscope)
  Using cached pillow-11.1.0-cp313-cp313-macosx_10_13_x86_64.whl.metadata (9.1 kB)
Collecting matplotlib (from petroscope)
  Using cached matplotlib-3.10.1-cp313-cp313-macosx_10_13_x86_64.whl.metadata (11 kB)
Collecting tqdm (from petroscope)
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting scipy (from petroscope)
  Using cached scipy-1.15.2-cp313-cp313-macosx_10_13_x86_64.whl.metadata (61 kB)
Collecting loguru (from petroscope)
  Using cached loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Collecting prettytable (from petroscope)
  Using cached prettytable-3.16.0-py3-none-any.whl.metadata (33 kB)
Collecting request

код для отображения изображений в jupiter notebook.

In [206]:
import numpy as np
import matplotlib.pyplot as plt

def show(image, title: str = None, cmap: str = None):
    plt.imshow(image, cmap=cmap)
    if title:
        plt.title(title)
    plt.axis('off')
    plt.show()

Пути до датасета LumenStone S1v1.5 и сбалансированных данных. Запись локально.

In [294]:
from pathlib import Path
ds_path = Path('./LumenStone/S1_v1.5/')
b_path = Path("./balanced_data/")
sb_path = Path("./small_balanced_data/")
vsb_path = Path("./very_small_balanced_data/")

информация о классах (номер класса, аббревиатура, название, цвет маски):

In [208]:
from petroscope.segmentation.classes import ClassSet, LumenStoneClasses

classset = LumenStoneClasses.S1v1()
for cl in classset.classes:
    print(cl)

[0, bg (background), color: #000000]
[1, ccp/kub (chalcopyrite/cubanite), color: #ffa500]
[2, gl (galena), color: #9acd32]
[4, brt (bornite), color: #00bfff]
[6, py/mrc (pyrite/marcasite), color: #2f4f4f]
[8, sph (sphalerite), color: #ee82ee]
[11, tnt/ttr (tenantite/tetrahedrite), color: #483d8b]


Несолько подключенний библиотек

In [219]:
!pip install scikit-image

Collecting scikit-image
  Downloading scikit_image-0.25.2-cp313-cp313-macosx_10_13_x86_64.whl.metadata (14 kB)
Collecting networkx>=3.0 (from scikit-image)
  Downloading networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting imageio!=2.35.0,>=2.33 (from scikit-image)
  Downloading imageio-2.37.0-py3-none-any.whl.metadata (5.2 kB)
Collecting tifffile>=2022.8.12 (from scikit-image)
  Downloading tifffile-2025.3.30-py3-none-any.whl.metadata (32 kB)
Collecting lazy-loader>=0.4 (from scikit-image)
  Downloading lazy_loader-0.4-py3-none-any.whl.metadata (7.6 kB)
Downloading scikit_image-0.25.2-cp313-cp313-macosx_10_13_x86_64.whl (13.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.9/13.9 MB[0m [31m34.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading imageio-2.37.0-py3-none-any.whl (315 kB)
Downloading lazy_loader-0.4-py3-none-any.whl (12 kB)
Downloading networkx-3.4.2-py3-none-any.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [220]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import pickle as pkl

from skimage.color import rgb2gray
from skimage.filters import sobel
from skimage.segmentation import watershed

Класс MyGeoSegmModel
Метод load - просто сохранение обьекта через pickle

Обучение: запоминание признаков текстуры разных классов - наборы признаков (признаки GLCM и средний цвет) кладутся в KNeighborsClassifier (далее knn), который и будет предсказывать текстуру

Предсказание: делим изображения на кластеры через метод Watershed, в кластере берем кусок текстуры и предсказываем через knn класс для всего кластера

In [None]:
import petroscope.segmentation as segm
from dataclasses import dataclass
from typing import Iterable
import numpy as np
from tqdm import tqdm
from petroscope.segmentation.utils import load_image, load_mask
from scipy.ndimage import convolve, maximum_filter

class MyGeoSegmModel(segm.GeoSegmModel):

    #meta parameters for training
    block_size = 32
    filler_color = [0,0,0] #[-1000, -1000, -1000]
    filler_color_grey = 0 #-1000
    density_minimum = 0.3
    glcm_vector = [4, 0]
    glcm_levels = 16
    features_count = 4+3+4
    knn_parameter = 3

    #load
    path_to_saved_data = Path('./saved_data/') #'geo_proj_data_16_par.pkl'

    #meta parameters for predicting
    watershed_markers = 128 #1024
    compactness = 0.001
    sampler_size = 32
    sampler_search_step = 4

    def generate_laws_filters(self) -> dict[str, np.ndarray]:
        # Определение одномерных масок Лавса
        filters_1d = {
            "L5": np.array([1, 4, 6, 4, 1], dtype=np.float16),   # Level (L)
            "E5": np.array([-1, -2, 0, 2, 1], dtype=np.float16), # Edge (E)
            "S5": np.array([-1, 0, 2, 0, -1], dtype=np.float16), # Spot (S)
            "R5": np.array([1, -4, 6, -4, 1], dtype=np.float16), # Ripple (R)
            "W5": np.array([-1, 2, 0, -2, 1], dtype=np.float16), # Wave (W)
        }
        filters_2d = {}
        for f_name_1, f_val_1 in filters_1d.items():
            for f_name_2, f_val_2 in filters_1d.items():
                kernel = np.outer(f_val_1, f_val_2)  # Декартово произведение
                name = f"{f_name_1}{f_name_2}"
                filters_2d[name] = kernel
        return filters_2d

    def __init__(self, classes: ClassSet) -> None:
        super().__init__()
        self.classes = classes
        self.clusters = dict()
        self.scaler = StandardScaler()
        self.knn = KNeighborsClassifier(n_neighbors=self.knn_parameter)

        laws_filters = self.generate_laws_filters()
        self.filter = laws_filters["L5E5"]



    def load(self, saved_path = path_to_saved_data, **kwargs) -> None:
        self.knn = pkl.load(open(self.path_to_saved_data / f"knn.pkl", 'rb'))
        self.scaler = pkl.load(open(self.path_to_saved_data / f"scaler.pkl", 'rb'))
        

    def calculate_glcm_features(self, image: np.ndarray, num_levels = glcm_levels, dx = glcm_vector[0], dy = glcm_vector[1]) -> np.ndarray:
        #image = np.floor(image / (256 / num_levels)).astype(int)
        image = (image * (num_levels-1)).astype(int)
        rows, cols = image.shape
        glcm = np.zeros((num_levels, num_levels), dtype=int)
        for i in range(rows - dy):
            for j in range(cols - dx):
                x = image[i, j]
                y = image[i + dy, j + dx]
                # if x >= 0 and y >= 0:
                glcm[x, y] += 1
        glcmn = glcm.astype(np.float32) / np.sum(glcm)
        n = num_levels
        glcmn_2 = glcmn[glcmn > 0]  # Убираем нули
        entropy = -np.sum(glcmn_2 * np.log(glcmn_2))
        contrast = np.sum([(i - j) ** 2 * glcmn[i, j] for i in range(n) for j in range(n)])
        homogeneity = np.sum([glcmn[i, j] / (1 + (i - j) ** 2) for i in range(n) for j in range(n)])
        dissimilarity = np.sum([abs(i - j) * glcmn[i, j] for i in range(n) for j in range(n)])
        return [entropy, contrast, homogeneity, dissimilarity]
    


    def create_features(self, block: np.ndarray, mask: np.ndarray, code:int, density: np.float32) ->np.ndarray:
        grey_block = rgb2gray(block)
        grey_block[mask != code] = self.filler_color_grey
        color = block[mask == code, :].mean(axis=0)
        filtered = np.clip(convolve(grey_block, self.filter, mode='constant', cval=0.0), 0, 1)
        
        return np.concatenate([self.calculate_glcm_features(grey_block), color, self.calculate_glcm_features(filtered)], axis=-1)

    def train(
        self, img_mask_paths: Iterable[tuple[Path, Path]], **kwargs
    ) -> None:
        for img_p, mask_p in tqdm(img_mask_paths):
            img = load_image(img_p, normalize=True)
            mask = load_mask(mask_p, classes=self.classes, one_hot=False)
            #print("img", img)

            features_array = np.array([])
            code_array = np.array([])

            for i in range(0, img.shape[0], self.block_size):
                for j in range(0, img.shape[1], self.block_size):
                    block = img[i:i+self.block_size, j:j+self.block_size]
                    block_mask = mask[i:i+self.block_size, j:j+self.block_size]
                    for code in np.unique(block_mask):    
                        density = block[block_mask == code].size / block.size
                        if density >= self.density_minimum: 
                            #construct and add new element to knn pool:
                            features = self.create_features(block, block_mask, code, density)
                            features_array = np.append(features_array, features)
                            code_array = np.append(code_array, code)


            #print("features array: ", features_array)
            features_array = features_array.reshape(code_array.shape[0], self.features_count)
            features_array = self.scaler.fit_transform(features_array)
            self.knn.fit(features_array, code_array)

            
            #save
            self.path_to_saved_data.mkdir(exist_ok=True)
            pkl.dump(self.knn, open(self.path_to_saved_data / f"knn.pkl", 'wb'), pkl.HIGHEST_PROTOCOL)
            pkl.dump(self.scaler, open(self.path_to_saved_data / f"scaler.pkl", 'wb'), pkl.HIGHEST_PROTOCOL)

        




            

        

    def predict_image(self, image: np.ndarray) -> np.ndarray:
        
        # watershed algorithm:
        gradient = sobel(rgb2gray(image))
        segments_watershed = watershed(gradient, markers=self.watershed_markers, compactness=self.compactness)-1
        result = np.zeros(segments_watershed.shape, dtype=int)
        cluster_count = len(np.unique(segments_watershed))
        # [density and coordinates of corner] of sampler for each cluster
        # we want samplers with the biggest density
        sampler_density = np.zeros((cluster_count, ), dtype=np.float32)
        sampler_coordiantes = np.zeros((cluster_count, 2), dtype=int)

        

        # chosing samplers
        for i in range(0, segments_watershed.shape[0] - self.sampler_size, self.sampler_search_step):
                for j in range(0, segments_watershed.shape[1] - self.sampler_size, self.sampler_search_step):
                    block = segments_watershed[i:i+self.sampler_size, j:j+self.sampler_size]
                    for code in np.unique(block):
                        density = block[block == code].size / block.size
                        if sampler_density[code] < density:
                            sampler_density[code] = density
                            sampler_coordiantes[code] = [i, j]
        


        # predict for each cluster using best sampler (the densest)
        for code in range(cluster_count):
            density = sampler_density[code]
            i, j = sampler_coordiantes[code]
            block = image[i:i+self.sampler_size, j:j+self.sampler_size]
            mask = segments_watershed[i:i+self.sampler_size, j:j+self.sampler_size]
            features = self.create_features(block, mask, code, density)
            features = self.scaler.transform([features])
            result[segments_watershed == code] = self.knn.predict(features)[0]

        return result

Тут я использовал балансировщик из petroscope и сохранил патчи в папку `./balanced_data`:

In [None]:
from pathlib import Path
from petroscope.segmentation.balancer.balancer import SelfBalancingDataset
from PIL import Image


def img_mask_pairs(ds_dir: Path):
    img_dir = ds_dir / "imgs" / "train"
    mask_dir = ds_dir / "masks" / "train"
    img_mask_p = [
        (img_p, mask_dir / f"{img_p.stem}.png")
        for img_p in sorted(img_dir.iterdir())
    ]
    return img_mask_p


def run_balancer(iterations=10, save_patches=True):

    exp_dir = Path("./very_small_balanced_data")
    exp_dir.mkdir(parents=True, exist_ok=True)

    ds = SelfBalancingDataset(
        img_mask_paths=img_mask_pairs(Path("LumenStone/S1_v1.5/")),
        patch_size=256,
        augment_rotation=30,
        augment_scale=0.1,
        cls_indices=list(range(16)),
        class_area_consideration=1.5,
        patch_positioning_accuracy=0.8,
        balancing_strength=0.75,
        acceleration=8,
        cache_dir=Path(".") / "cache",
    )

    s = ds.sampler_balanced()
    for i in tqdm(range(iterations), "extracting patches"):

        img, msk = next(s)
        if save_patches:
            (exp_dir / "imgs/").mkdir(exist_ok=True)
            (exp_dir / "imgs/train/").mkdir(exist_ok=True)
            (exp_dir / "masks/").mkdir(exist_ok=True)
            (exp_dir / "masks/train/").mkdir(exist_ok=True)
            Image.fromarray(img).save(exp_dir / f"imgs/train/train_{i+1}.png")
            Image.fromarray(msk).save(exp_dir / f"masks/train/train_{i+1}.png")

    print(ds.accum)
    


run_balancer()

[32m2025-04-08T02:30:07.417441+0300[0m [1mINFO[0m [36mInitializing dataset...[0m
loading images:  15%|█▌        | 9/59 [00:03<00:20,  2.48it/s]


KeyboardInterrupt: 

In [None]:
from pathlib import Path
from petroscope.segmentation.balancer.balancer import SelfBalancingDataset
from PIL import Image


def img_mask_pairs(ds_dir: Path):
    img_dir = ds_dir / "imgs" / "train"
    mask_dir = ds_dir / "masks" / "train"
    img_mask_p = [
        (img_p, mask_dir / f"{img_p.stem}.png")
        for img_p in sorted(img_dir.iterdir())
    ]
    return img_mask_p


def run_balancer(iterations=10, save_patches=True):

    exp_dir = Path("./very_small_balanced_data")
    exp_dir.mkdir(parents=True, exist_ok=True)

    ds = SelfBalancingDataset(
        img_mask_paths=img_mask_pairs(Path("LumenStone/S1_v1.5/")),
        patch_size=256,
        augment_rotation=30,
        augment_scale=0.1,
        cls_indices=list(range(16)),
        class_area_consideration=1.5,
        patch_positioning_accuracy=0.8,
        balancing_strength=0.75,
        acceleration=8,
        cache_dir=Path(".") / "cache",
    )

    s = ds.sampler_balanced()
    for i in tqdm(range(iterations), "extracting patches"):

        img, msk = next(s)
        if save_patches:
            (exp_dir / "imgs/").mkdir(exist_ok=True)
            (exp_dir / "imgs/train/").mkdir(exist_ok=True)
            (exp_dir / "masks/").mkdir(exist_ok=True)
            (exp_dir / "masks/train/").mkdir(exist_ok=True)
            Image.fromarray(img).save(exp_dir / f"imgs/train/train_{i+1}.png")
            Image.fromarray(msk).save(exp_dir / f"masks/train/train_{i+1}.png")

    print(ds.accum)
    


run_balancer()

Список пар (путь до изображения, путь до маски) для обучающей (отдельно для сбалансированных данных) и тестовой выборок

In [311]:
# fill correct path to the dataset
train_img_mask_p = [
    (img_p, ds_path / "masks" / "train" / f"{img_p.stem}.png")
    for img_p in sorted((ds_path / "imgs" / "train").iterdir())
]

balanced_train_img_mask_p = [
    (img_p, vsb_path / "masks" / "train" / f"{img_p.stem}.png")
    for img_p in sorted((vsb_path / "imgs" / "train").iterdir())
]

j_test_img_mask_p = [
    (img_p, sb_path / "masks" / "train" / f"{img_p.stem}.png")
    for img_p in sorted((sb_path / "imgs" / "train").iterdir())
]

test_img_mask_p = [
    (img_p, ds_path / "masks" / "test" / f"{img_p.stem}.png")
    for img_p in sorted((ds_path / "imgs" / "test").iterdir())
]

Создаем класс 

In [342]:
model = MyGeoSegmModel(classes=classset)

Обучаем

In [343]:
#train:
model.train(img_mask_paths=balanced_train_img_mask_p[:]) 

100%|██████████| 10/10 [00:03<00:00,  2.70it/s]


Или можно загрузить из файла `geo_proj_data.pkl` вместо обучения

In [318]:
#or load:
model.load()

Запуск тестирования модели

In [345]:
from petroscope.segmentation.eval import SegmDetailedTester


tester = SegmDetailedTester(
    Path("output"),
    classes=classset,
    void_pad=0,
    void_border_width=4,
    vis_plots=False,
    vis_segmentation=True,
)

res, res_void = tester.test_on_set(
    #j_test_img_mask_p [0:10], #
    test_img_mask_p[:1], # remove limit in future!
    lambda img: model.predict_image(img),
    description="test",
    return_void=True,
)

print(f"Metrics:\n{res}")
print(f"Metrics with void borders:\n{res_void}")

testing: 100%|██████████| 1/1 [00:41<00:00, 41.83s/it]

Metrics:
	 iou [soft]:
		 bg: 0.0000 [0.0000]
		 brt: 1.0000 [1.0000]
		 ccp/kub: 0.0000 [0.0000]
		 gl: 0.0031 [0.0031]
		 py/mrc: 0.0000 [0.0000]
		 sph: 0.0000 [0.0000]
		 tnt/ttr: 0.0000 [0.0000]
	 mean iou [soft]: 0.1433 [0.1433]
	 acc: 0.0031

Metrics with void borders:
	 iou [soft]:
		 bg: 0.0000 [0.0000]
		 brt: 1.0000 [1.0000]
		 ccp/kub: 0.0000 [0.0000]
		 gl: 0.0029 [0.0029]
		 py/mrc: 0.0000 [0.0000]
		 sph: 0.0000 [0.0000]
		 tnt/ttr: 0.0000 [0.0000]
	 mean iou [soft]: 0.1433 [0.1433]
	 acc: 0.0828






В директории ```./output``` по результатам тестирования можете найти визуализацию сегментаций и текстовые файлы со значениями метрик.

## Что нужно теперь сделать?

Вам необходимо реализовать собственный метод автоматической сегментиации минералов. Разработанное решение должно быть оформлено в виде класса, отнаследованного от ```GeoSegmModel``` из пакета ```petroscope``` (как в примере выше).

Обратите внимание, что реализованный класс должен поддерживать автоматическое сохранение и загрузку модели (метод ```load```). Это позволит протестировать решение, не обучая модель заново.

Ваша цель - добиться как можно более высоких показаний метрик сегментации на тестовой выборке (тестовую выборку нельзя использовать при обучении или валидации!), ключевой является метрика mean_iou в режиме void_borders.

Решенные задания присылайте в виде ссылок на github репозиторий, или непосредственно ipynb ноутбуки. Обязательно проверьте воспроизводимость кода, чтобы я мог запустить вашу обученную модель!

<font color="red">
Рекомендуется после получения пайплайна с полными результатами обучения экспортировать ноутбук в pdf (файл -> печать) и положить этот pdf в репозиторий вместе с самим ноутбуком.
</font>

Удачи!