In [None]:
!pip install torch torchvision rasterio numpy matplotlib

# Используемые библиотеки

In [None]:
import os
from PIL import Image, ImageFont, ImageDraw
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import tifffile
import os
import zipfile
import cv2
from google.colab.patches import cv2_imshow
import csv
import rasterio
import math
from sklearn.metrics import matthews_corrcoef
from tqdm import tqdm

# Предобработка

#Изменение tiff файлов для проверки гипотез:

Изменяем tiff в jpg

In [None]:
# Достаем из tiff первые 3 потока для изменения tiff в jpg
for i in range(21):
    folder_name = i
    folder_name_str = f'merged/{folder_name:02}.tiff'
    with tifffile.TiffFile(folder_name_str) as tif:
        image_array = tif.asarray()

    r = image_array[:, :, 0]
    g = image_array[:, :, 1]
    b = image_array[:, :, 2]

    rgb_image = np.stack([r, g, b], axis=-1)
    # Сохраняем jpg в папку с изображениями
    rgb_image = rgb_image - np.min(rgb_image)
    rgb_image = rgb_image / np.max(rgb_image)
    rgb_image = (rgb_image * 255).astype(np.uint8)
    image_pil = Image.fromarray(rgb_image)
    image_pil.save(f'./jpgs/{folder_name:02}.jpg')

Отделяем маски от tiff

In [None]:
# Достаем из tiff маску
for i in range(21):
    folder_name_str = f'merged/{i:02}.tiff'
    with tifffile.TiffFile(folder_name_str) as tif:
        image_array = tif.asarray()

    mask = image_array[:, :, 4]
    mask = mask - np.min(mask)
    mask = mask / np.max(mask)
    mask = (mask * 255).astype(np.uint8)

    # Сохранение маски как черно-белого изображения
    mask_pil = Image.fromarray(mask, mode='L')
    mask_pil.save(f'masks/{i:02}.jpg')


Создание отдельной папки для tiff файлов

In [None]:
import os
import shutil

# Папка, где находятся все исходные папки
source_dir = 'minprirody_train/train'

# Папка, куда будут перемещены и переименованы файлы
destination_dir = 'merged'

# Создаем папку назначения, если она не существует
if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)

# Проходим по папкам от 00 до 20
for folder_name in range(21):
    folder_name_str = f'{folder_name:02}'  # Преобразуем число в строку с двумя цифрами
    folder_path = os.path.join(source_dir, folder_name_str)

    # Проверяем, существует ли папка
    if os.path.isdir(folder_path):
        # Предполагаем, что в каждой папке только один файл .tiff
        files = [f for f in os.listdir(folder_path) if f.endswith('.tiff')]

        if files:
            # Переименовываем и перемещаем файл
            file_path = os.path.join(folder_path, files[0])
            new_file_name = f'{folder_name_str}.tiff'
            new_file_path = os.path.join(destination_dir, new_file_name)
            shutil.copy(file_path, new_file_path)  # Копируем файл в новую папку
        else:
            print(f'No .tiff files found in folder {folder_path}')

print("Files have been successfully merged.")

Формируем методату из tiff

In [None]:
# Путь к папке с TIFF файлами
folder_path = 'merged'

# Список для хранения данных
data = []

# Проходим по всем файлам в папке merged
for file_name in os.listdir(folder_path):
    file_path = os.path.join(folder_path, file_name)

    # Открываем каждый TIFF-файл с помощью rasterio
    with rasterio.open(file_path) as dataset:
        # Получаем BoundingBox
        bbox = dataset.bounds
        # Вычисляем ширину и высоту
        width, height = dataset.width, dataset.height
        # Получаем координаты верхнего левого угла
        top_left_lng, top_left_lat = bbox.left, bbox.top

        # Добавляем информацию в список
        data.append([file_name, width, height, top_left_lng, top_left_lat])

# Создаем DataFrame с использованием pandas
df = pd.DataFrame(data,
                  columns=['file_name', 'width', 'height', 'top_left_lng',
                           'top_left_lat'])

# Сохраняем данные в CSV файл
df.to_csv('metadata.csv', index=False)

Аугментация данных при помощи "нарезки" масок исходных tiff на менее масштаные снимки

In [None]:
# Функция для разрезания изображения на куски
def split_image(image, chunk_size):
    img_width, img_height = image.size
    chunks = []
    for i in range(0, img_width, chunk_size):
        for j in range(0, img_height, chunk_size):
            box = (i, j, i + chunk_size, j + chunk_size)
            chunk = image.crop(box)
            chunks.append(chunk)
    return chunks

# Папки с исходными данными и новые папки
input_images_folder = 'jpgs'
input_masks_folder = 'masks'
output_folder = 'new_data'
output_images_folder = os.path.join(output_folder, 'images')
output_masks_folder = os.path.join(output_folder, 'masks')

# Создание новых папок
os.makedirs(output_images_folder, exist_ok=True)
os.makedirs(output_masks_folder, exist_ok=True)

# Разрезаем изображения и маски
def process_folder(input_folder, output_folder, chunk_size):
    for filename in os.listdir(input_folder):
        if filename.endswith('.jpg') or filename.endswith('.png'):
            file_path = os.path.join(input_folder, filename)
            image = Image.open(file_path)
            chunks = split_image(image, chunk_size)

            for idx, chunk in enumerate(chunks):
                chunk_filename = f"{filename[:-4]}_{idx}.jpg"
                chunk_path = os.path.join(output_folder, chunk_filename)
                chunk.save(chunk_path)

# Обработка изображений и масок
process_folder(input_images_folder, output_images_folder, 256)
process_folder(input_masks_folder, output_masks_folder, 256)

print("Разрезка изображений и масок завершена.")

# Использование U-Net для сигментации изображения и создания масок

In [None]:
# Класс U-NET
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()

        self.encoder1 = self.conv_block(in_channels, 64)
        self.encoder2 = self.conv_block(64, 128)
        self.encoder3 = self.conv_block(128, 256)
        self.encoder4 = self.conv_block(256, 512)

        self.pool = nn.MaxPool2d(2, 2)

        self.middle = self.conv_block(512, 1024)

        self.upconv4 = self.upconv_block(1024, 512)
        self.decoder4 = self.conv_block(1024, 512)
        self.upconv3 = self.upconv_block(512, 256)
        self.decoder3 = self.conv_block(512, 256)
        self.upconv2 = self.upconv_block(256, 128)
        self.decoder2 = self.conv_block(256, 128)
        self.upconv1 = self.upconv_block(128, 64)
        self.decoder1 = self.conv_block(128, 64)

        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def upconv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoder
        enc1 = self.encoder1(x)
        enc2 = self.pool(enc1)
        enc2 = self.encoder2(enc2)
        enc3 = self.pool(enc2)
        enc3 = self.encoder3(enc3)
        enc4 = self.pool(enc3)
        enc4 = self.encoder4(enc4)

        # Middle
        middle = self.pool(enc4)
        middle = self.middle(middle)

        # Decoder
        up4 = self.upconv4(middle)
        cat4 = torch.cat([up4, enc4], dim=1)
        dec4 = self.decoder4(cat4)

        up3 = self.upconv3(dec4)
        cat3 = torch.cat([up3, enc3], dim=1)
        dec3 = self.decoder3(cat3)

        up2 = self.upconv2(dec3)
        cat2 = torch.cat([up2, enc2], dim=1)
        dec2 = self.decoder2(cat2)

        up1 = self.upconv1(dec2)
        cat1 = torch.cat([up1, enc1], dim=1)
        dec1 = self.decoder1(cat1)
        print(dec1.shape)

        out = self.final_conv(dec1)
        return out

U-Net доказал свою эффективность во многих сферах на всевозможных соревнованиях где требовалась сигментация данныых и создание масок. Именно по этой причине мы решили реализовать данную модель

Предобработка input данных для U-Net

In [None]:
# Класс для сегментации датасета
class SegmentationDataset(Dataset):
    def __init__(self, images_dir, masks_dir, transform=None):
        self.images_dir = images_dir
        self.masks_dir = masks_dir
        self.transform = transform

        self.image_filenames = sorted(os.listdir(images_dir))
        self.mask_filenames = sorted(os.listdir(masks_dir))

        assert len(self.image_filenames) == len(self.mask_filenames),

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        img_name = os.path.join(self.images_dir, self.image_filenames[idx])
        mask_name = os.path.join(self.masks_dir, self.mask_filenames[idx])

        image = Image.open(img_name).convert('RGB')
        mask = Image.open(mask_name).convert('L')

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        # Преобразуем маску в бинарный формат и удаляем лишнюю размерность
        mask = torch.where(mask > 0, torch.tensor(1.0), torch.tensor(0.0))
        mask = mask.squeeze(0)  # Удаляем размерность каналов, если есть

        return image, mask

# Примеры трансформаций для предварительной обработки изображений
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

Создание датасета и выборок

In [None]:
images_dir = '/content/pics'
masks_dir = '/content/masks'

# Создаем экземпляр датасета и DataLoader
dataset = SegmentationDataset(images_dir, masks_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

Обучение U-Net на исходных данных

In [None]:
# Устанавливаем устройство для вычислений (GPU или CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Создаем экземпляр модели UNet
model = UNet(in_channels=3, out_channels=1).to(device)

# Определяем функцию потерь и оптимизатор
criterion = nn.BCEWithLogitsLoss()  # Используем BCEWithLogitsLoss для бинарной классификации
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Определяем количество эпох
num_epochs = 100

# Цикл обучения
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    # Перебираем данные из DataLoader
    for images, masks in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images, masks = images.to(device), masks.to(device)

        # Обратный проход
        optimizer.zero_grad()

        # Прямой проход
        outputs = model(images)

        # Вычисляем потерю
        masks = masks.unsqueeze(1)  # Добавляем размерность к маске, чтобы она совпадала с размером выхода
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")
    if epoch % 10 == 0:
        torch.save(model.state_dict(), f'unet_model{epoch}.pth')

# Сохраняем модель
torch.save(model.state_dict(), 'unet_model.pth')
print("Model saved to 'unet_model.pth'")

#  После обучение можно констатировать, что U-Net при данном объеме данных не способен выдовать нужную точность при создании масок






---



#После повторного анализа исходных данных, мы пришли к решению оперировать параметром ИК для создания масок

Констатные значения

In [None]:
ML = 0.00025  # коэффициент масштабирования радиометрии
AL = 0.05  # коэффициент смещения радиометрии
K1 = 765.0  # калибровочная константа K1
K2 = 1275.0  # калибровочная константа K2
epsilon = 0.93  # излучательная способность
lambda_ir = 10.9 * 10 ** -6  # длина волны ИК канала (мкм)
rho = 1.438 * 10 ** -2

Подсчет метрики

In [None]:
# Подсчет коэфициета Метьюса
def calculate_mcc(true_mask, predicted_mask):
    return matthews_corrcoef(true_mask.flatten(), predicted_mask.flatten())

# Скейлим MCC
def scale_mcc(mcc):
    return (mcc + 1) / 2

Формируем датасет

In [None]:
# Сбор информации из .tiff файла
class TiffData:
    def __init__(self, file_path):
        with rasterio.open(file_path) as dataset:
            self.infrared_layer = dataset.read(4)
            self.green_layer = dataset.read(2)
            self.true_fire_mask = dataset.read(5)
            self.dataset = dataset

Алгоритм подбора оптимальных параметров

In [None]:
# Функция для поиска лучших гиперпараметров
def find_best_borders(tiff_files, step=0.1, delta=0.05):
    best_mcc = -1
    best_green_border = None
    best_lst_border = None

    # Загрузка всех данных в память
    tiff_data_list = [TiffData(file_path) for file_path in tiff_files]

    # Перебор значений green_border и lst_border
    for green_border in tqdm(np.arange(0, 1 + step, step),
                             desc="Перебор значений green_border"):
        for lst_border in tqdm(np.arange(0, 1 + step, step),
                               desc="Перебор значений lst_border",
                               leave=False):
            total_mcc = 0
            count = 0

            for tiff_data in tiff_data_list:
                # Считываем слои
                infrared_layer = tiff_data.infrared_layer
                green_layer = tiff_data.green_layer
                true_fire_mask = tiff_data.true_fire_mask

                # Преобразуем DN в радиометрическое излучение
                radiance = ML * infrared_layer + AL
                brightness_temp = K2 / (np.log((K1 / radiance) + 1))
                lst = brightness_temp / (1 + (
                            lambda_ir * brightness_temp / rho) * np.log(
                    epsilon))
                lst_celsius = lst - 273.15
                lst_celsius_normalized = (
                                                     lst_celsius - lst_celsius.min()) / (
                                                     lst_celsius.max() - lst_celsius.min())

                # Создаем маску на основе границ
                final_mask = create_mask(green_layer,
                                         lst_celsius_normalized,
                                         green_border, lst_border)

                # Вычисляем коэффициент Метьюса
                mcc = calculate_mcc(true_fire_mask, final_mask)
                scaled_mcc = scale_mcc(mcc)  # Применяем масштабирование
                total_mcc += scaled_mcc
                count += 1

            # Средний коэффициент Метьюса
            average_mcc = total_mcc / count

            # Проверяем, является ли текущий результат лучшим
            if average_mcc > best_mcc:
                best_mcc = average_mcc
                best_green_border = green_border
                best_lst_border = lst_border
                print(
                    f"Улучшение результата: Green Border = {best_green_border}, LST Border = {best_lst_border}, MCC = {best_mcc}")

    # Перебор значений с ±delta от найденных лучших границ
    for green_border in [best_green_border + delta, best_green_border,
                         best_green_border - delta]:
        for lst_border in [best_lst_border + delta, best_lst_border,
                           best_lst_border - delta]:
            total_mcc = 0
            count = 0
            for tiff_data in tiff_data_list:
                # Считываем слои
                infrared_layer = tiff_data.infrared_layer
                green_layer = tiff_data.green_layer
                true_fire_mask = tiff_data.true_fire_mask

                # Преобразуем DN в радиометрическое излучение
                radiance = ML * infrared_layer + AL
                brightness_temp = K2 / (np.log((K1 / radiance) + 1))
                lst = brightness_temp / (1 + (
                            lambda_ir * brightness_temp / rho) * np.log(
                    epsilon))
                lst_celsius = lst - 273.15
                lst_celsius_normalized = (
                                                     lst_celsius - lst_celsius.min()) / (
                                                     lst_celsius.max() - lst_celsius.min())

                # Создаем маску на основе границ
                final_mask = create_mask(green_layer,
                                         lst_celsius_normalized,
                                         green_border, lst_border)

                # Вычисляем коэффициент Метьюса
                mcc = calculate_mcc(true_fire_mask, final_mask)
                scaled_mcc = scale_mcc(mcc)  # Применяем масштабирование
                total_mcc += scaled_mcc
                count += 1

            # Средний коэффициент Метьюса
            average_mcc = total_mcc / count

            # Проверяем, является ли текущий результат лучшим
            if average_mcc > best_mcc:
                best_mcc = average_mcc
                best_green_border = green_border
                best_lst_border = lst_border
                print(
                    f"Улучшение результата после уточнения: Green Border = {best_green_border}, LST Border = {best_lst_border}, MCC = {best_mcc}")

    return best_green_border, best_lst_border, best_mcc

Output алгоритма по созданию масок

In [None]:

tiff_files = [f"merged/{i:02d}.tiff" for i in range(21)]

# Поиск лучших границ
best_green_border, best_lst_border, best_mcc = find_best_borders(tiff_files)

print(f"Best Green Border: {best_green_border}")
print(f"Best LST Border: {best_lst_border}")
print(f"Best Matthews Correlation Coefficient: {best_mcc}")

##Имплементация алгоритма по генерации масок

Имплементация и использование предобученного UNet для определения процента изображения покрытого лесом

In [None]:
# Предобученная Ю-нет для подсчета процента деревьев на изображении
class UNet:
    def __init__(self):
        self.model = smp.Unet(encoder_name='resnet34',
                              encoder_weights='imagenet',
                              classes=1)
        self.model.eval()
        # preprocess для обработки входного изображения
        self.preprocess = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor()
        ])

    def predict(self, image_pil):
        input_image = self.preprocess(image_pil).unsqueeze(0)

        with torch.no_grad():
            output = self.model(input_image)
        # Подсчет доли деревьев на снимке
        mask = (output.squeeze().numpy() > 0.5).astype(np.uint8)
        total_pixels = mask.size
        object_pixels = np.sum(mask)
        percentage_objects = (object_pixels / total_pixels) * 100

        return percentage_objects

Расчет fire_score исходя из данных

In [None]:
# Подсчет fire_score для формулы
def calculate_fire_score(percentage_trees, humidity,
                         wind_speed):
    risk = 0
    risk += 0.5 * ((100 - humidity) / 100)
    risk += 0.5 * (wind_speed / 20)
    risk += 0.1 * (percentage_trees / 100)
    return np.clip(risk, 0, 1)


Расчет маски

In [None]:
# Создание маски на основе инфрокрасного и rgb изображений
def create_mask(green_layer, lst_celsius_normalized, green_border, lst_border):
    # Создаем условие для создания маски: зеленый канал должен быть больше green_border,
    # а нормализованная температура должна быть больше lst_border
    condition = (green_layer > green_border) & (lst_celsius_normalized > lst_border)
    # Возвращаем маску, где 1 соответствует условию, а 0 - не соответствует
    return np.where(condition, 1, 0)

# Нормализация данных для подсчета маски
def get_mask(file_path, humidity, wind_speed, unet: UNet):
    # Задаем границы для зеленого канала и температуры
    green_border = 0.05
    lst_border = 0.55

    # Создаем объект TiffData для работы с TIFF файлом
    tiff_data = TiffData(file_path)

    # Посчет коэфицентов необходимых для формулы
    # Радиометрическая коррекция инфракрасного канала
    radiance = ML * tiff_data.infrared_layer + AL
    # Расчет яркостной температуры
    brightness_temp = K2 / (np.log((K1 / radiance) + 1))
    # Расчет температуры поверхности земли (LST)
    lst = brightness_temp / (1 + (lambda_ir * brightness_temp / rho) * np.log(epsilon))

    # Расчет оценки пожара с помощью сети UNet
    fire_score = calculate_fire_score(unet.predict(tiff_data.image), humidity, wind_speed)

    # Нормализация температуры поверхности земли с учетом оценки пожара
    lst_normalized = ((lst - lst.min()) / (lst.max() - lst.min())) * (1 + fire_score * 0.01)

    # Создаем маску на основе зеленого канала и нормализованной температуры
    mask = create_mask(tiff_data.green_layer, lst_normalized, green_border, lst_border) * 255

    # Преобразуем маску в изображение
    image = Image.fromarray(mask.astype(np.uint8), mode="L")

    # Возвращаем маску и исходное RGB изображение
    return image, tiff_data.image


Наложение получившенийся маски и доп данных на исходное изображение

In [None]:
def color_mask(image, mask, color=(255, 0, 0)):
    image = image.convert("RGB")
    mask = mask.convert("L")
    color_image = Image.new("RGB", image.size, color)
    alpha_mask = mask.point(lambda p: p > 128 and 255)  # Преобразуем белые пиксели в альфа-канал
    result_image = Image.composite(color_image, image, alpha_mask)
    return result_image

def draw_wind_arrow(img, wind_speed, wind_direction, arrow_color=(255, 255, 255), text_size=20):
    draw = ImageDraw.Draw(img)
    width, height = img.size
    center_x, center_y = width // 2, height // 2

    # Длина стрелки
    arrow_length = int(min(width, height) * 0.2)

    # Начальная и конечная точка стрелки
    start_x = center_x - (arrow_length / 2) * math.cos(math.radians(wind_direction))
    start_y = center_y + (arrow_length / 2) * math.sin(math.radians(wind_direction))
    end_x = center_x + (arrow_length / 2) * math.cos(math.radians(wind_direction))
    end_y = center_y - (arrow_length / 2) * math.sin(math.radians(wind_direction))

    # Рисуем линию (тело стрелки)
    draw.line((start_x, start_y, end_x, end_y), fill=arrow_color, width=5)

    # Величина и угол крыльев стрелки
    arrow_head_size = arrow_length * 0.25  # Длина крыльев = 1/4 от длины стрелки
    arrow_head_angle = 30  # Угол между крыльями и основным направлением стрелки

    # Левое крыло
    left_wing_x = end_x - arrow_head_size * math.cos(math.radians(wind_direction + arrow_head_angle))
    left_wing_y = end_y + arrow_head_size * math.sin(math.radians(wind_direction + arrow_head_angle))

    # Правое крыло
    right_wing_x = end_x - arrow_head_size * math.cos(math.radians(wind_direction - arrow_head_angle))
    right_wing_y = end_y + arrow_head_size * math.sin(math.radians(wind_direction - arrow_head_angle))

    # Рисуем крылья стрелки
    draw.polygon([(end_x, end_y), (left_wing_x, left_wing_y), (right_wing_x, right_wing_y)], fill=arrow_color)

    # Рисуем текст (скорость ветра)
    try:
        font = ImageFont.truetype("arial.ttf", text_size)
    except IOError:
        font = ImageFont.load_default()

    text = f"{wind_speed} m/s"
    text_offset = 25  # Смещение текста от конца стрелки
    text_x = end_x + text_offset * math.cos(math.radians(wind_direction))
    text_y = end_y - text_offset * math.sin(math.radians(wind_direction))

    # Отрисовка текста
    draw.text((text_x, text_y), text, fill=arrow_color, font=font)

    return img


Подсчет метрики на данных

In [None]:
# Подсчет среднего значения Коэфициента Корреляции Метьюса (далее MCC) для файлов train датасета
def get_score(path_to_tiffs='data/merged/',
              weather_data_path='weather_data.csv',
              path_to_masks='data/masks/'):
    # Читаем данные о погоде из CSV файла
    weather_data = pd.read_csv(weather_data_path, index_col='file_id')

    # Создаем экземпляр сети UNet
    unet = UNet()

    # Инициализируем переменную для хранения суммы MCC
    total_mcc = 0

    # Цикл по всем файлам
    for i in range(21):
        img = Image.open(f'{path_to_masks}/{i:02}.jpg').convert('L')
        # Преобразуем маску в numpy массив
        mask = np.array(img)
        # Бинаризуем маску (0 или 1)
        binary_mask = np.where(mask > 128, 1, 0)
        wdata = weather_data.loc[i]
        # Получаем маску для текущего файла с помощью функции get_mask
        mask = get_mask(f'{path_to_tiffs}/{i:02}.tiff',
                        wdata['humidity'], wdata['wind_speed'], unet)
        # Расчет MCC для текущей маски
        mcc = calculate_mcc(binary_mask, mask)
        # Масштабируем MCC
        scaled_mcc = scale_mcc(mcc)
        # Добавляем MCC к сумме
        total_mcc += scaled_mcc
    return total_mcc / 21
