In [25]:
import numpy as np
import rasterio
from prompt_toolkit.utils import to_str
from rasterio.enums import Resampling
import matplotlib.pyplot as plt
from PIL import Image
import os
from time import time

from calculate_metrics import calculate_f1_score

In [26]:
# Параметры палитры для маски
PALLETE = [
    [0, 0, 0],  # не вода - черный цвет
    [0, 0, 255]  # вода - синий цвет
]

In [27]:
# Функции для расчета водных индексов
def calculate_indices(blue, green, red, nir, mir, swir):
    indices = {
        'NDWI': (green - nir) / (green + nir),
        'NDMI': (nir - mir) / (nir + mir),
        'MNDWI': (green - mir) / (green + mir),
        'WRI': (green + red) / (nir + mir),
        'NDVI': (nir - red) / (nir + red),
        'AWEI': 4 * (green - mir) - (0.25 * nir + 2.75 * swir)
    }
    return indices

In [51]:
# Функция для создания маски воды на основе порогов индексов
def create_water_mask(indices):
    # Условия для каждого индекса
    ndwi_mask = indices['NDWI'] < 1   #74
    ndmi_mask = indices['NDMI'] < -6.48   #50
    mndwi_mask = indices['MNDWI'] < 1   #70
    wri_mask = indices['WRI'] > 1      #70
    ndvi_mask = indices['NDVI'] > 1    #70
    awei_mask = indices['AWEI'] < 4.35   #62

    # Объединяем условия для создания водной маски
    water_mask = (ndwi_mask | mndwi_mask | wri_mask | ndvi_mask | awei_mask | ndmi_mask).astype(np.uint8)
    return water_mask

In [29]:
# Функции для нормализации и увеличения яркости
def normalize(band):
    band_min, band_max = band.min(), band.max()
    return (band - band_min) / (band_max - band_min)

def brighten(band):
    alpha = 0.13
    beta = 0
    return np.clip(alpha * band + beta, 0, 255)
# Функция для преобразования изображения
def convert(im_path):
    with rasterio.open(im_path) as fin:
        red = fin.read(3)
        green = fin.read(2)
        blue = fin.read(1)

    red_b = brighten(red)
    green_b = brighten(green)
    blue_b = brighten(blue)

    red_bn = normalize(red_b)
    green_bn = normalize(green_b)
    blue_bn = normalize(blue_b)

    return np.dstack((blue_b, green_b, red_b)), np.dstack((red_bn, green_bn, blue_bn))

In [30]:
# Основная функция для обработки изображения Sentinel-2A и создания масок
def process_image(image_path, output_mask_path, reference_mask_path=None):
    with rasterio.open(image_path) as src:
        # Считывание необходимых каналов с понижением разрешения до 20 м
        blue = src.read(1)
        green = src.read(2)
        red = src.read(3)
        nir = src.read(7)
        mir = src.read(9)
        swir = src.read(10)

        # Расчет индексов
        indices = calculate_indices(blue, green, red, nir, mir, swir)

        # Создание водной маски
        water_mask = create_water_mask(indices)

        # Сохранение маски в формате .tif
        with rasterio.open(
            output_mask_path,
            'w',
            driver='GTiff',
            height=water_mask.shape[0],
            width=water_mask.shape[1],
            count=1,
            dtype=water_mask.dtype,
            crs=src.crs,
            transform=src.transform
        ) as dst:
            dst.write(water_mask, 1)

    # Визуализация исходного изображения и маски воды
    #plot_data(image_path, output_mask_path)

In [31]:
# Визуализация исходного изображения и маски воды
def plot_data(image_path, mask_path):
    plt.figure(figsize=(12, 12))
    pal = [value for color in PALLETE for value in color]

    # Исходное изображение
    plt.subplot(1, 2, 1)
    _, img = convert(image_path)
    plt.imshow(img)
    plt.title('Исходное изображение')

# Маска воды
    plt.subplot(1, 2, 2)
    with rasterio.open(mask_path) as fin:
        mask = fin.read(1)
    mask = Image.fromarray(mask).convert('P')
    mask.putpalette(pal)
    plt.imshow(mask)
    plt.title('Маска воды')
    plt.show()

In [52]:
# Путь к изображению и пути для сохранения маски
image_path = 'gistograms/images'
output_mask_path = 'gistograms/mask_pred'
masks_path = "gistograms/masks"

# Запуск обработки
#process_image("gistograms/images/1.tif", "gistograms/mask_pred/1.tif")

files_name = ['1.tif', '2.tif', '4.tif', '5.tif', '6_1.tif', '6_2.tif', '9_1.tif', '9_2.tif']
end_f1_metric = 0

print(f"F1 score with all trash holders")
start_time = time()
for file_name in files_name:
    print(f"Mask: {file_name}")
    start_mask_time = time()
    
    res_image_path = os.path.join(image_path, file_name)
    res_mask_path = os.path.join(output_mask_path, file_name)
    process_image(res_image_path, res_mask_path)
    
    file_1 = os.path.join(masks_path, file_name)
    file_2 = os.path.join(output_mask_path, file_name)
    
    f1_metric = calculate_f1_score(file_1, file_2)
    end_mask_time = time()
    end_f1_metric += f1_metric
    print(f"Time to create mask: {end_mask_time - start_mask_time :.2f} \t F1 metric: {f1_metric :.2f}")
end_time = time()

print(f"\nAll time to create masks: {end_time - start_time :.2f}  F1 metric: {end_f1_metric / 8:.2f}")

F1 score with all trash holders
Mask: 1.tif
Time to create mask: 24.71 	 F1 metric: 0.91
Mask: 2.tif
Time to create mask: 3.37 	 F1 metric: 0.63
Mask: 4.tif
Time to create mask: 5.56 	 F1 metric: 0.67
Mask: 5.tif
Time to create mask: 1.44 	 F1 metric: 0.84
Mask: 6_1.tif
Time to create mask: 11.49 	 F1 metric: 0.75
Mask: 6_2.tif
Time to create mask: 11.44 	 F1 metric: 0.93
Mask: 9_1.tif
Time to create mask: 0.06 	 F1 metric: 0.77
Mask: 9_2.tif
Time to create mask: 0.06 	 F1 metric: 0.83

All time to create masks: 58.13  F1 metric: 0.79


In [34]:
import os
import rasterio
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score


In [36]:
def calculate_f1_score(file_path_1, file_path_2):
    """Calculate the F1 score between corresponding images in two directories."""
    f1_scores = []
    
    # Load the images
    img1 = load_raster(file_path_1)
    img2 = load_raster(file_path_2)
    
    # Calculate F1 score
    f1 = f1_score(img1, img2, average='macro')
    f1_scores.append(f1)
    
    # Calculate average F1 score across all image pairs
    average_f1 = np.mean(f1_scores)
    
    return average_f1

In [33]:
def load_raster(file_path):
    """Load a raster image and flatten it into a 1D array."""
    with rasterio.open(file_path) as src:
        data = src.read(1)  # Read the first band
    return data.flatten()