In [1]:
"""Importing libraries"""
import time
import os
from collections import Counter
import numpy as np
import cv2
import rasterio # pylint: disable=import-error
import tifffile # pylint: disable=import-error
from sklearn.cluster import MiniBatchKMeans # pylint: disable=import-error
from sklearn.decomposition import PCA # pylint: disable=import-error
from osgeo import gdal
def convert_gray_to_rgb(image):
    """Conversion of single band image to three band image"""
    if len(image.shape) == 2:
        return np.broadcast_to(image[:, :, np.newaxis], (*image.shape, 3))
    return image
def calculate_index(band1, band2, formula):
    """Generalizing index calculation function"""
    b_1 = band1.astype('float64')
    b_2 = band2.astype('float64')
    return formula(b_1,b_2)
def process_image(bands):
    """NDVI, NDWI, NDSI, and SAVI calculation"""
    ndvi = calculate_index(bands['nir'], bands['red'],
    lambda nir, red: np.where((nir + red) == 0., 0, (nir - red) / (nir + red)))
    ndwi = calculate_index(bands['green'], bands['nir'],
    lambda green, nir: np.where((green + nir) == 0., 0, (green - nir) / (green + nir)))
    ndsi = calculate_index(bands['swir1'], bands['nir'],
    lambda swir1, nir: np.where((swir1 + nir) == 0., 0, (swir1 - nir) / (swir1 + nir)))
    savi = calculate_index(bands['nir'], bands['red'],
    lambda nir, red: np.where((nir + red + 0.5) == 0., 0,
                              (nir - red)*(1 + 0.5) / (nir + red + 0.5)))
    return ndvi, ndwi, ndsi, savi
def create_masks(ndvi, ndwi, ndsi, savi):
    """Creating masks based on thresholds for both images"""
    vegetation_mask = ndvi > 0.05
    water_mask = ndwi > 1
    shadow_mask = ndsi > 1
    soil_mask = savi > 1
    combined_mask = np.logical_or(np.logical_or
                                  (vegetation_mask, water_mask), shadow_mask, soil_mask)
    return combined_mask
def apply_mask(image, mask):
    """Applying combined mask to each image"""
    mask = mask[:, :, np.newaxis]
    return np.where(mask, np.nan, image.astype('float64'))
def load_optical_tif_ntf_images(image_path1, image_path2, apply_preprocessing=False):
    """Load optical TIFF, TIF, NITF, and NTF images"""
    if image_path1.lower().endswith(('.tif', '.tiff')):
        def read_image(image_path):
            with rasterio.open(image_path) as src:
                bands = src.read()
            return bands.transpose(1, 2, 0)
        image1 = read_image(image_path1)
        image2 = read_image(image_path2)
    elif image_path1.lower().endswith(('.nitf', '.ntf')):
        dataset_1 = gdal.Open(image_path1)
        dataset_2 = gdal.Open(image_path2)
        image1 = dataset_1.ReadAsArray().transpose(1, 2, 0)
        image2 = dataset_2.ReadAsArray().transpose(1, 2, 0)
    if image1 is None or image2 is None:
        print(f"Error: Unable to load images {image_path1} or {image_path2}")
        return None, None
    if image1.shape != image2.shape:
        image2 = cv2.resize(image2, (image1.shape[1], image1.shape[0])) # pylint: disable=no-member
    if not apply_preprocessing:
        return image1, image2
    bands1 = {
        'red': image1[:, :, 0], 'green': image1[:, :, 1], 'blue': image1[:, :, 2],
        'nir': image1[:, :, 3], 'swir1': image1[:, :, 4], 'swir2': image1[:, :, 5]
    }
    bands2 = {
        'red': image2[:, :, 0], 'green': image2[:, :, 1], 'blue': image2[:, :, 2],
        'nir': image2[:, :, 3], 'swir1': image2[:, :, 4], 'swir2': image2[:, :, 5]
    }
    ndvi1, ndwi1, ndsi1, savi1 = process_image(bands1)
    ndvi2, ndwi2, ndsi2, savi2 = process_image(bands2)
    mask1 = create_masks(ndvi1, ndwi1, ndsi1, savi1)
    mask2 = create_masks(ndvi2, ndwi2, ndsi2, savi2)
    masked_image1 = apply_mask(image1, mask1)
    masked_image2 = apply_mask(image2, mask2)
    masked_image1 = np.nan_to_num(masked_image1, nan=0, posinf=0, neginf=0)
    masked_image2 = np.nan_to_num(masked_image2, nan=0, posinf=0, neginf=0)
    return masked_image1, masked_image2
def load_sar_rgb_images(image_path1, image_path2):
    """Load one and three channel images"""
    if image_path1.lower().endswith(('.tif', '.tiff')):
        image1 = tifffile.imread(image_path1)
        image2 = tifffile.imread(image_path2)
    elif image_path1.lower().endswith(('.nitf', '.ntf')):
        dataset_1 = gdal.Open(image_path1)
        dataset_2 = gdal.Open(image_path2)
        image1 = dataset_1.ReadAsArray()
        image2 = dataset_2.ReadAsArray()
    elif image_path1.lower().endswith(('.jpg', '.jpeg', '.bmp', '.png')):
        image1 = cv2.imread(image_path1) # pylint: disable=no-member
        image2 = cv2.imread(image_path2) # pylint: disable=no-member
    if image1 is None or image2 is None:
        print(f"Error: Unable to load images {image_path1} or {image_path2}")
        return None, None
    image1 = convert_gray_to_rgb(image1)
    image2 = convert_gray_to_rgb(image2)
    if image1.shape != image2.shape:
        image2 = cv2.resize(image2, (image1.shape[1], image1.shape[0])) # pylint: disable=no-member
    return image1, image2
def process_tif_ntf_unsupervised(input_path):
    """Process TIF, TIFF, NITF and NTF optical images, return it as an RGB array"""
    if input_path.lower().endswith(('.tif', '.tiff')):
        with rasterio.open(input_path) as src:
            red = np.nan_to_num(src.read(1), nan=0)
            green = np.nan_to_num(src.read(2), nan=0)
            blue = np.nan_to_num(src.read(3), nan=0)
    elif input_path.lower().endswith(('.nitf', '.ntf')):
        dataset = gdal.Open(input_path)
        red = dataset.GetRasterBand(1).ReadAsArray()
        green = dataset.GetRasterBand(2).ReadAsArray()
        blue = dataset.GetRasterBand(3).ReadAsArray()
        red = np.nan_to_num(red, nan=0)
        green = np.nan_to_num(green, nan=0)
        blue = np.nan_to_num(blue, nan=0)
    def contrast_stretch(band, pmin=2, pmax=98):
        lower = np.percentile(band, pmin)
        upper = np.percentile(band, pmax)
        band = np.clip(band, lower, upper)
        band_scaled = (band - lower) / (upper - lower) * 255
        return band_scaled.astype(np.uint8)
    red =  contrast_stretch(red)
    green =  contrast_stretch(green)
    blue =  contrast_stretch(blue)
    rgb = np.dstack((red, green, blue))
    return rgb
def norm_img(img):
    """Image normalization by computing minimum and maximum pixel values"""
    img = img.astype(np.float32)
    min_val = np.min(img)
    max_val = np.max(img)
    normalized_img = (img - min_val) / (max_val - min_val) * 255
    return normalized_img.astype(np.uint8)
def overlay_image(change_map_resized, image):
    """Overlay change map on input images"""
    _, binary_mask = cv2.threshold(change_map_resized, 200, 255, cv2.THRESH_BINARY)  # pylint: disable=no-member
    green_mask = np.zeros_like(image)
    indices = np.where(binary_mask == 255)
    green_mask[indices[0], indices[1], :] = [0, 255, 0]
    overlay = cv2.add(image, green_mask)  # pylint: disable=no-member
    return overlay
def extract_georeference(input_data):
   """Extract GCPs, projection, and geotransform from a georeferenced image"""
   dataset = gdal.Open(input_data, gdal.GA_ReadOnly)
   if dataset is None:
       print("Failed to open the input file")
   projection = dataset.GetProjection()
   geotransform = dataset.GetGeoTransform()
   gcps = dataset.GetGCPs()
   gcp_projection = dataset.GetGCPProjection()
   dataset = None
   return projection, geotransform, gcps, gcp_projection
def embed_georeference(output_data, projection, geotransform, gcps, gcp_projection):
   """Embeds extracted georeference details into a non-georeferenced image"""
   dataset = gdal.Open(output_data, gdal.GA_Update)
   if dataset is None:
       print("Failed to open the output file for updating")
       return
   if projection:
       dataset.SetProjection(projection)
   if geotransform:
       dataset.SetGeoTransform(geotransform)
   if gcps and gcp_projection:
       dataset.SetGCPs(gcps, gcp_projection)
   dataset = None
   print(f"Georeference details embedded successfully into {output_data}")
def run_georef(input_data, output_data):
    """Main function of georeferencing"""
    projection, geotransform, gcps, gcp_projection = extract_georeference(input_data)
    if projection and geotransform:
        embed_georeference(output_data, projection, geotransform, gcps, gcp_projection)
    else:
        print("Georeference details could not be extracted")
def find_vector_set(diff_image, nw_size):
    """Finding vector set"""
    i = 0
    j = 0
    vector_set = np.zeros((int(nw_size[0] * nw_size[1] / 25), 25))
    while i < vector_set.shape[0]:
        while j < nw_size[1]:
            k = 0
            while k < nw_size[0]:
                block = diff_image[j:j + 5, k:k + 5]
                feature = block.ravel()
                vector_set[i, :] = feature
                k = k + 5
            j = j + 5
        i = i + 1
    mean_vec = np.mean(vector_set, axis=0)
    vector_set = vector_set - mean_vec
    return vector_set, mean_vec
def find_fvs(evs, diff_image, mean_vec, new):
    """Finding feature vector set"""
    i = 2
    feature_vector_set = []
    while i < new[1] - 2:
        j = 2
        while j < new[0] - 2:
            block = diff_image[i - 2:i + 3, j - 2:j + 3]
            feature = block.flatten()
            feature_vector_set.append(feature)
            j = j + 1
        i = i + 1
    fvs = np.dot(feature_vector_set, evs)
    fvs = fvs - mean_vec
    return fvs
def clustering(fvs, components, new):
    """MiniBatchKMeans clustering"""
    kmeans = MiniBatchKMeans(n_clusters=components, batch_size=1024,
                             n_init=10, init='k-means++', random_state=42, verbose=0)
    kmeans.fit(fvs)
    output = kmeans.predict(fvs)
    count = Counter(output)
    small_index = min(count, key=count.get)
    change_mask = np.reshape(output, (new[1] - 4, new[0] - 4))
    return small_index, change_mask
def run_change_detection(image_path1, image_path2):
    """Main execution block"""
    start_time = time.time()
    file_ext1 = os.path.splitext(image_path1)[-1].lower()
    file_ext2 = os.path.splitext(image_path2)[-1].lower()
    if file_ext1 != file_ext2:
        print("Error: Image formats do not match")
        return
    if file_ext1 in ['.jpg', '.jpeg', '.bmp', '.png']:
        image1, image2 = load_sar_rgb_images(image_path1, image_path2)
    elif file_ext1 in ['.tif', '.tiff', '.ntf', '.nitf']:
        if file_ext1 in ['.tif', '.tiff']:
            with rasterio.open(image_path1) as src1:
                num_bands1 = src1.count
            with rasterio.open(image_path2) as src2:
                num_bands2 = src2.count
        else:
            dataset_1 = gdal.Open(image_path1)
            dataset_2 = gdal.Open(image_path2)
            num_bands1 = dataset_1.RasterCount
            num_bands2 = dataset_2.RasterCount
        if num_bands1 == 1 and num_bands2 == 1:
            image1, image2 = load_sar_rgb_images(image_path1, image_path2)
            image1 = norm_img(image1)
            image2 = norm_img(image2)
        else:
            user_input = int(input("1. Without preprocessing, 2. With preprocessing: "))
            if user_input == 1:
                image1, image2 = load_optical_tif_ntf_images(
                image_path1, image_path2, apply_preprocessing=False)
            else:
                image1, image2 = load_optical_tif_ntf_images(
                image_path1, image_path2, apply_preprocessing=True)
    new_size = np.asarray(image1.shape) // 5
    new_size = new_size.astype(int) *5
    image1_resized = cv2.resize(image1, (new_size[0],new_size[1])).astype(int) # pylint: disable=no-member
    image2_resized = cv2.resize(image2, (new_size[0],new_size[1])).astype(int) # pylint: disable=no-member
    difference_image = abs(image1_resized - image2_resized)
    difference_image = difference_image[:, :, 1]
    vector_sets, mean_vector = find_vector_set(difference_image, new_size)
    pca = PCA()
    pca.fit(vector_sets)
    eigen_vector = pca.components_
    feature_vector = find_fvs(eigen_vector, difference_image, mean_vector, new_size)
    components = 3
    least_index, change_map = clustering(feature_vector, components, new_size)
    change_map[change_map == least_index] = 255
    change_map[change_map != 255] = 0
    change_map = change_map.astype(np.uint8)
    original_height, original_width = image1.shape[:2]
    change_map_resized = cv2.resize(change_map, (original_width, original_height)) # pylint: disable=no-member
    end_time = time.time()
    print("Computational time in seconds:", end_time - start_time)
    output_path = None
    if all(path.lower().endswith(('.tif', '.tiff',
                                  '.ntf', '.nitf')) for path in (image_path1, image_path2)):
        file_ext = os.path.splitext(image_path1)[-1].lower()
        if file_ext in ['.tif', '.tiff']:
            with rasterio.open(image_path1) as src:
                num_bands = src.count
        else:
            dataset = gdal.Open(image_path1, gdal.GA_ReadOnly)
            num_bands = dataset.RasterCount
        output_path = 'Output_image.tif'
        if num_bands > 3:
            rgb_image = process_tif_ntf_unsupervised(image_path1)
            overlay = overlay_image(change_map_resized, rgb_image)
        else:
            overlay = overlay_image(change_map_resized, image1)
        tifffile.imwrite(output_path, overlay)
    else:
        overlay = overlay_image(change_map_resized, image1)
        output_path = 'Output_image.png'
        cv2.imwrite(output_path, overlay) # pylint: disable=no-member
    print(f'Output saved to {output_path}')
    run_georef(image_path1, output_path)
image_path1 = 'SGSS_CD_1.tif'  # Path of Image_1
image_path2 = 'SGSS_CD_2.tif'  # Path of Image_2
run_change_detection(image_path1, image_path2)