#  **Satellite Image Analysis Using Convolution Neural Networks For Environmental Monitoring**

In [None]:
%%capture
!pip install transformers
!pip install albumentations opencv-python-headless

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Data Manipulation
import numpy as np
import pandas as pd

import albumentations as A
import cv2

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Computer Vision
import cv2 as cv

# CNN
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras import regularizers
from tensorflow.keras.layers import Conv2D, Dropout, Dense, AveragePooling2D, MaxPooling2D, Flatten, Activation
from tqdm.keras import TqdmCallback
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Vision transformers
from transformers.models.vit.feature_extraction_vit import ViTFeatureExtractor
from transformers import ViTForImageClassification, TrainingArguments, Trainer

# Saving models and data
import joblib

# OS for navigation and environment set up
import os

# Tensorflow
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

# Splitting datasets
from sklearn.model_selection import train_test_split

# Random
import random

# Progress bar
from tqdm import tqdm

# Model import

from keras.models import load_model

# Warnings
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

# Data import

In [None]:
base_dir = "/content/drive/MyDrive/Satellite Dataset"
train_dir = os.path.join(base_dir, "train")
test_dir = os.path.join(base_dir, "test")
valid_dir = os.path.join(base_dir, "valid")
annotations = pd.read_csv(os.path.join(base_dir, "metadata.csv"))
classes = pd.read_csv(os.path.join(base_dir, "class_dict.csv"))

In [None]:
class_path = '/content/drive/MyDrive/Satellite Dataset/class_dict.csv'
data_path = '/content/drive/MyDrive/Satellite Dataset/metadata.csv'

df = annotations

train_data_paths = df[df['split']=='train']
test_data_paths = df[df['split']=='test']

train_data_paths.head()

Now we have to concatenate all the train relative paths with the base dir.

In [None]:
# Run and Ignore the warnings
train_data_paths['sat_image_path'] = pd.Series([os.path.join(base_dir, path) for path in train_data_paths['sat_image_path']])
train_data_paths['mask_path'] = pd.Series([os.path.join(base_dir, path) for path in train_data_paths['mask_path']])

In [None]:
train_data_paths

In [None]:
labels = pd.read_csv(class_path)
labels

# Class ID	Class Name	      RGB Value	          Color Name
0	        Urban Land	      (0, 255, 255)	      Cyan
1	        Agriculture Land	(255, 255, 0)	      Yellow
2	        Rangeland	        (255, 0, 255)	      Magenta
3	        Forest Land	      (0, 255, 0)	        Green
4	        Water	            (0, 0, 255)	        Blue
5	        Barren Land	      (255, 255, 255)	    White
6	        Unknown	          (0, 0, 0)	          Black

Notice that the land segmentation is given by:

| Class  | Colour |
| --- | --- |
| `urban_land` |  cyan |
| `agriculture_land` | yellow |
| `rangeland` | magenta |
| `forest_land` | Green |
| `water` | Blue  |
| `barren_land` | White |
| `unknown` | Black |


# Data-viz

In [None]:
sample_path = train_data_paths['sat_image_path'][3]
sample_image = cv.imread(sample_path)

plt.figure(figsize=(10, 10))
plt.title('Satellital image')
plt.imshow(sample_image)
plt.axis('off')
plt.show()

In [None]:
sample1 = train_data_paths['sat_image_path'][:200]
sample2 = train_data_paths['mask_path'][:200]

i=-2

fig, ax = plt.subplots(ncols=20, nrows=10, figsize=(20, 10))
for x in range(10):
  i+=1
  for y in range(20):
    i+=1
    if i%2==0:
      path = sample1[5*x+y]
    else:
      path = sample2[5*x+y]
    image = cv.imread(path)
    ax[x, y].imshow(image)
    ax[x, y].axis('off')
plt.suptitle('Satellital images')
fig.show()

In [None]:
def plot_overlay(index_img=0, data_path=train_data_paths):
  # Load the image and the mask
  img = plt.imread(data_path['sat_image_path'][index_img])
  mask = plt.imread(data_path['mask_path'][index_img])

  # Create a figure and axes
  fig, axs = plt.subplots(1,3, figsize=(24,8))

  # Display the sat image
  axs[0].imshow(img)
  axs[0].set_title("Sat Image")
  axs[0].axis('off')

  # Display the mask
  axs[1].imshow(mask)
  axs[1].set_title("Mask")
  axs[1].axis('off')

  # Display the mask over the sat image
  axs[2].imshow(img)
  axs[2].imshow(mask, alpha=0.3)
  axs[2].set_title("Overlay")
  axs[2].axis('off')

  # show the plot
  plt.show()

Observe how segmentations are displayed:

In [None]:
for i in random.sample(range(len(train_data_paths)), 5):
  plot_overlay(index_img=i)

# Data preprocessing

## One Hot **Encoding**

Target: convert mask rgb values into one hot encoding vectors.

In [None]:
train_data_paths['mask_path'][8]

In [None]:
sample_image = plt.imread(train_data_paths['mask_path'][8])

plt.figure(figsize=(10, 10))
plt.title('Example of Mask Image')
plt.axis('off')
plt.imshow(sample_image)
plt.show()

In [None]:
sample_image[45][68]

In [None]:
uniques = np.array([sample_image[i][j] for i in range(2448) for j in range(2448)])
uniques = np.unique(uniques)

In [None]:
uniques

## Image Tiling

In [None]:
sample_path = train_data_paths['sat_image_path'][0]
sample_image = plt.imread(sample_path)

In [None]:
sample_image.shape

In [None]:
plt.figure(figsize=(15, 15))
plt.imshow(sample_image)
plt.title('Sample image before tiling')
plt.axis('off')
plt.show()

We are going to usea a tile of size $272\times272$ In order to apply a seamless semantic segmentation.

In [None]:
def crop_image(image, tile_shape=(272, 272), show_results=True) -> np.ndarray:
  """
  Parameters:

    - image=np.ndarray. Representation of image as a numpy tensor, must have 3 dimensions.
    - tile_shape= tuple. size of tile dimensions. Its values must be divisors of the ones of the original image according with axes. Each cropped image will have this size
    - show_results=bool. Boolean selection for visualization purposes.

  Output: List of cropped images as a Numpy ndarray
  """

  image_shape = image.shape
  n_rows = image_shape[0] // tile_shape[0]
  n_cols = image_shape[1] // tile_shape[1]

  sub_images = []
  if show_results==True:
    fig, ax = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=(15, 15))

  for i in range(n_rows):
    for j in range(n_cols):
      sub_image = sample_image[i*272:(1+i)*272,j*272:(1+j)*272,:]
      sub_images.append(sub_image)
      if show_results==True:
        ax[i, j].imshow(sub_image)
        ax[i, j].axis('off')

  if show_results==True:
    fig.suptitle('Cropped Images')
    plt.show()

  sub_images = np.array(sub_images)
  return sub_images

In [None]:
sub_images = crop_image(sample_image)

In [None]:
plt.figure(figsize=(10, 10))
plt.title('Example sub Image')
plt.axis('off')
plt.imshow(sub_images[0])
plt.show()

In [None]:
def re_crop_image(cropped_images, image_shape=(2448, 2448, 3), show_results=True)->np.ndarray:
  """
  Parameters:
    - cropped_images=np.ndarray: Set of sub images as a numpy tensor or list of numpy 3D arrays.
    - image_shape=tuple. Desired output image shape.
    - show_results=bool. Boolean selection for visualization purposes.
  Output: new_image=np.ndarray. Image cropped as np array.
  """
  new_image = np.zeros((2448, 2448, 3))
  tile_shape = cropped_images[0].shape
  cropped_images = list(cropped_images)
  n_rows = image_shape[0] // tile_shape[0]
  n_cols = image_shape[1] // tile_shape[1]
  for i in range(n_rows):
    for j in range(n_cols):
      sub_image = cropped_images[i*n_rows+j]
      new_image[i*272:(1+i)*272,j*272:(1+j)*272,:] = sub_image / 255

  if show_results:
    plt.figure(figsize=(15, 15))
    plt.axis('off')
    plt.title('Re-Cropped Image')
    plt.imshow(new_image)
  return new_image

In [None]:
re_cropped_image = re_crop_image(sub_images)

In [None]:
sub_images.shape

## Prepare data to train

In [None]:
train_image_paths, train_mask_paths, val_image_paths, val_mask_paths = (train_data_paths['sat_image_path'], train_data_paths['mask_path'], test_data_paths['sat_image_path'], test_data_paths['mask_path'])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(train_image_paths, train_mask_paths, test_size=0.2, random_state=42)

In [None]:
X_train

In [None]:
X_test

In [None]:
# Reindexing

X_train = pd.Series(X_train.tolist())
X_test = pd.Series(X_test.tolist())

y_train = pd.Series(y_train.tolist())
y_test = pd.Series(y_test.tolist())

In [None]:
X_train

In [None]:
plt.imshow(plt.imread(X_train[2]))
plt.title('Example Sat Image')
plt.axis('off')
plt.show()

In [None]:
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

In [None]:
def load_data(images_path, masks_path, shape):
  data = {'images': [], 'masks': []}
  progress_bar = tqdm(range(len(images_path)),
                      desc="Loading Data",
                      total=len(images_path),
                      bar_format="{desc}: {percentage:3.0f}%|\033[32m{bar}\033[0m| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
  for i in progress_bar:

    img = plt.imread(images_path[i])
    mask = plt.imread(masks_path[i])

    img = cv.resize(img, shape)
    mask = cv.resize(mask, shape)

    data['images'].append(img)
    data['masks'].append(mask)

  data['images'] = np.array(data['images'])
  data['masks'] = np.array(data['masks'])

  return data

In [None]:
train_data = load_data(images_path=X_train, masks_path=y_train,
                       shape=(256, 256)
                       )
test_data = load_data(images_path=X_test, masks_path=y_test,
                      shape=(256, 256)
                      )

In [None]:
train_data['masks'].shape

In [None]:
len(X_train)

In [None]:
train_data['masks'][0]

# Training

## U-Net

In [None]:
# def create_conv_block(input_tensor, num_filters):
#   """
#     Helps us to create the convolutional blocks
#   """
#   # First Conv layer
#   x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3), kernel_initializer='he_normal', padding='same')(input_tensor)
#   x = tf.keras.layers.BatchNormalization()(x)
#   x = tf.keras.layers.Activation('relu')(x)
#   # Second Conv layer

#   x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3), kernel_initializer='he_normal', padding='same')(x)
#   x = tf.keras.layers.BatchNormalization()(x)
#   x = tf.keras.layers.Activation('relu')(x)

#   return x

def create_conv_block(input_tensor, num_filters):
    """
    Enhanced convolutional block with residual connection and advanced activation
    """
    # First Conv layer with residual connection
    x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3),
                                kernel_initializer='he_normal', padding='same')(input_tensor)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('swish')(x)  # Replace ReLU with Swish activation

    # Second Conv layer
    x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3),
                                kernel_initializer='he_normal', padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)

    # Residual connection
    if input_tensor.shape[-1] != num_filters:
        shortcut = tf.keras.layers.Conv2D(num_filters, (1, 1), padding='same')(input_tensor)
    else:
        shortcut = input_tensor

    # Combine residual connection
    x = tf.keras.layers.Add()([x, shortcut])
    x = tf.keras.layers.Activation('swish')(x)

    return x

In [None]:
# def create_unet(input_shape, num_filters=16, dropout=0.1):
#   """
#     Creates the U-Net Architecture
#   """
#   # Encoder
#     # First conv block
#   c1 = create_conv_block(input_shape, num_filters * 1)
#   p1 = tf.keras.layers.MaxPool2D((2, 2))(c1)
#   p1 = tf.keras.layers.Dropout(dropout)(p1)
#     # Second conv block
#   c2 = create_conv_block(p1, num_filters * 2)
#   p2 = tf.keras.layers.MaxPool2D((2, 2))(c2)
#   p2 = tf.keras.layers.Dropout(dropout)(p2)
#     # Third conv block
#   c3 = create_conv_block(p2, num_filters * 4)
#   p3 = tf.keras.layers.MaxPool2D((2, 2))(c3)
#   p3 = tf.keras.layers.Dropout(dropout)(p3)
#     # Fourth conv block
#   c4 = create_conv_block(p3, num_filters * 8)
#   p4 = tf.keras.layers.MaxPool2D((2, 2))(c4)
#   p4 = tf.keras.layers.Dropout(dropout)(p4)

#   c5 = create_conv_block(p4, num_filters*16)

#   # Decoder
#     # First block
#   u6 = tf.keras.layers.Conv2DTranspose(num_filters*8, (3, 3), strides=(2, 2), padding='same')(c5)
#   u6 = tf.keras.layers.concatenate([u6, c4])
#   u6 = tf.keras.layers.Dropout(dropout)(u6)
#   c6 = create_conv_block(u6, num_filters*8)
#     # Second block
#   u7 = tf.keras.layers.Conv2DTranspose(num_filters*4, (3, 3), strides=(2, 2), padding='same')(c6)
#   u7 = tf.keras.layers.concatenate([u7, c3])
#   u7 = tf.keras.layers.Dropout(dropout)(u7)
#   c7 = create_conv_block(u7, num_filters*4)
#     #Third block
#   u8 = tf.keras.layers.Conv2DTranspose(num_filters*2, (3, 3), strides=(2, 2), padding='same')(c7)
#   u8 = tf.keras.layers.concatenate([u8, c2])
#   u8 = tf.keras.layers.Dropout(dropout)(u8)
#   c8 = create_conv_block(u8, num_filters*2)
#     # Fourth block
#   u9 = tf.keras.layers.Conv2DTranspose(num_filters*1, (3, 3), strides=(2, 2), padding='same')(c8)
#   u9 = tf.keras.layers.concatenate([u9, c1])
#   c9 = tf.keras.layers.Dropout(dropout)(u9)

#   output = tf.keras.layers.Conv2D(3, (1, 1), activation='sigmoid')(c9)
#   model = tf.keras.Model(inputs = [input_shape], outputs=[output])

#   return model

def create_unet(input_shape, num_filters=16, dropout=0.2):
    """
    Enhanced U-Net Architecture with Multi-Scale Feature Fusion
    """
    # Encoder
    # First conv block with multi-scale feature preparation
    c1 = create_conv_block(input_shape, num_filters * 1)
    p1 = tf.keras.layers.MaxPool2D((2, 2))(c1)
    p1 = tf.keras.layers.SpatialDropout2D(dropout)(p1)

    # Second conv block with multi-scale feature fusion
    c2 = create_conv_block(p1, num_filters * 2)
    # Resize c1 before concatenation to match c2's spatial dimensions
    c1_resized = tf.keras.layers.Resizing(c2.shape[1], c2.shape[2])(c1)
    c2_fused = tf.keras.layers.concatenate([c1_resized, c2])  # Multi-scale feature fusion
    p2 = tf.keras.layers.MaxPool2D((2, 2))(c2)
    p2 = tf.keras.layers.SpatialDropout2D(dropout)(p2)

    # Third conv block
    c3 = create_conv_block(p2, num_filters * 4)
    # Resize c2_fused before concatenation to match c3's spatial dimensions
    c2_fused_resized = tf.keras.layers.Resizing(c3.shape[1], c3.shape[2])(c2_fused)
    c3_fused = tf.keras.layers.concatenate([c2_fused_resized, c3])
    p3 = tf.keras.layers.MaxPool2D((2, 2))(c3)
    p3 = tf.keras.layers.SpatialDropout2D(dropout)(p3)

    # Fourth conv block
    c4 = create_conv_block(p3, num_filters * 8)
    # Resize c3_fused before concatenation to match c4's spatial dimensions
    c3_fused_resized = tf.keras.layers.Resizing(c4.shape[1], c4.shape[2])(c3_fused)
    c4_fused = tf.keras.layers.concatenate([c3_fused_resized, c4])
    p4 = tf.keras.layers.MaxPool2D((2, 2))(c4)
    p4 = tf.keras.layers.SpatialDropout2D(dropout)(p4)

    # Bottleneck
    c5 = create_conv_block(p4, num_filters*16)

    # Decoder with enhanced feature fusion
    # First block
    u6 = tf.keras.layers.Conv2DTranspose(num_filters*8, (3, 3), strides=(2, 2), padding='same')(c5)
    # Resize c4_fused before concatenation to match u6's spatial dimensions
    c4_fused_resized = tf.keras.layers.Resizing(u6.shape[1], u6.shape[2])(c4_fused)
    u6 = tf.keras.layers.concatenate([u6, c4_fused_resized])
    u6 = tf.keras.layers.SpatialDropout2D(dropout)(u6)
    c6 = create_conv_block(u6, num_filters*8)

    # Second block
    u7 = tf.keras.layers.Conv2DTranspose(num_filters*4, (3, 3), strides=(2, 2), padding='same')(c6)
    # Resize c3_fused before concatenation to match u7's spatial dimensions
    c3_fused_resized = tf.keras.layers.Resizing(u7.shape[1], u7.shape[2])(c3_fused)
    u7 = tf.keras.layers.concatenate([u7, c3_fused_resized])
    u7 = tf.keras.layers.SpatialDropout2D(dropout)(u7)
    c7 = create_conv_block(u7, num_filters*4)

    # Third block
    u8 = tf.keras.layers.Conv2DTranspose(num_filters*2, (3, 3), strides=(2, 2), padding='same')(c7)
    # Resize c2_fused before concatenation to match u8's spatial dimensions
    c2_fused_resized = tf.keras.layers.Resizing(u8.shape[1], u8.shape[2])(c2_fused)
    u8 = tf.keras.layers.concatenate([u8, c2_fused_resized])
    u8 = tf.keras.layers.SpatialDropout2D(dropout)(u8)
    c8 = create_conv_block(u8, num_filters*2)

    # Fourth block
    u9 = tf.keras.layers.Conv2DTranspose(num_filters*1, (3, 3), strides=(2, 2), padding='same')(c8)
    u9 = tf.keras.layers.concatenate([u9, c1])  # c1 shape is already compatible
    c9 = tf.keras.layers.SpatialDropout2D(dropout)(u9)

    # Enhanced output layer with multiple convolutions
    output = tf.keras.layers.Conv2D(64, (3, 3), padding='same', activation='swish')(c9)
    output = tf.keras.layers.BatchNormalization()(output)
    output = tf.keras.layers.Conv2D(32, (3, 3), padding='same', activation='swish')(output)
    output = tf.keras.layers.Conv2D(3, (1, 1), activation='sigmoid')(output)

    model = tf.keras.Model(inputs=[input_shape], outputs=[output])

    return model

In [None]:
input_shape = train_data['masks'][0].shape;
inputs = tf.keras.layers.Input(input_shape);

model = create_unet(inputs, num_filters=32);

In [None]:
print(inputs)

In [None]:
early = EarlyStopping(patience=20)
print(input_shape)

In [None]:
model.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
checkpoint = tf.keras.callbacks.ModelCheckpoint('best.keras', monitor='accuracy', save_best_only=True, verbose=1)

In [None]:
model_history = model.fit(train_data['images'], train_data['masks'],
                          epochs=100,
                          verbose=0,
                          batch_size=16,
                          callbacks=[checkpoint, TqdmCallback(verbose=1), early]
                        )

In [None]:
# model = load_model('/content/drive/MyDrive/Satellite Dataset/model.hdf5')
model.save('/content/drive/MyDrive/Satellite Dataset/eighty.keras')

In [None]:
model = load_model('/content/drive/MyDrive/Satellite Dataset/eighty.keras')

In [None]:
model

In [None]:
def predict_test_samples(val_map, model):
    # Get the first 5 images and masks
    imgs = val_map['images'][:25]
    masks = val_map['masks'][:25]

    # Predict masks for these 25 images
    predicted_masks = model.predict(imgs)

    return imgs, predicted_masks, masks


In [None]:
imgs, predicted_masks, masks = predict_test_samples(test_data, model);

In [None]:
def plot_predictions(image, mask_pred, ground_truth_mask):
  fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(30, 10))

  ax[0].imshow(image)
  ax[0].set_title('Original satellital image')
  ax[0].axis('off')

  ax[1].imshow(mask_pred)
  ax[1].set_title('Predicted Mask')
  ax[1].axis('off')

  ax[2].imshow(ground_truth_mask)
  ax[2].set_title('Ground truth Mask')
  ax[2].axis('off')

  plt.show()

In [None]:
for i in range(5):
  plot_predictions(imgs[i], predicted_masks[i], masks[i])

### Inference

In [None]:
# import cv2
# import numpy as np
# import pandas as pd
# from google.colab.patches import cv2_imshow

# def detect_class_color_portions(image_path):
#     """Detects portions of land classes in an image based on RGB colors and calculates their percentage in the image."""

#     class_colors = {
#         "urban_land": [0, 255, 255],
#         "agriculture_land": [255, 255, 0],
#         "rangeland": [255, 0, 255],
#         "forest_land": [0, 255, 0],
#         "water": [0, 0, 255],
#         "barren_land": [255, 255, 255],
#         "Unknown": [0, 0, 0]
#     }

#     image = cv2.imread(image_path)
#     if image is None:
#         print(f"Error: Image {image_path} not loaded properly.")
#         return None

#     hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

#     color_ranges = {
#         "urban_land": [np.array([20, 100, 100]), np.array([40, 255, 255])],
#         "agriculture_land": [np.array([80, 100, 100]), np.array([100, 255, 255])],
#         "rangeland": [np.array([140, 100, 100]), np.array([170, 255, 255])],
#         "forest_land": [np.array([36, 50, 50]), np.array([86, 255, 255])],
#         "water": [np.array([100, 50, 50]), np.array([130, 255, 255])],
#         "barren_land": [np.array([0, 0, 255]), np.array([180, 255, 255])],
#         "unknown": [np.array([0, 0, 0]), np.array([180, 255, 50])]
#     }

#     total_pixels = image.shape[0] * image.shape[1]
#     class_percentages = {}

#     for class_name, (lower, upper) in color_ranges.items():
#         mask = cv2.inRange(hsv, lower, upper)
#         class_pixels = cv2.countNonZero(mask)
#         class_percentage = (class_pixels / total_pixels) * 100
#         class_percentages[class_name] = class_percentage
#     return pd.DataFrame.from_dict(class_percentages, orient='index', columns=['Percentage'])

# def compare_images_with_display(image_path1, image_path2):
#     df1 = detect_class_color_portions(image_path1)
#     df2 = detect_class_color_portions(image_path2)

#     if df1 is None or df2 is None:
#         print("Error: One or both images failed to load or process.")
#         return None

#     # Compute the difference as image1 - image2
#     df_comparison = pd.merge(df1, df2, left_index=True, right_index=True, suffixes=('_image1', '_image2'))
#     df_comparison['Difference'] = df_comparison['Percentage_image1'] - df_comparison['Percentage_image2']

#     # Generate the textual statements
#     statements = []
#     for class_name, row in df_comparison.iterrows():
#         percentage1 = row['Percentage_image1']
#         percentage2 = row['Percentage_image2']
#         difference = row['Difference']
#         change_type = "increased" if difference > 0 else "reduced"

#         if difference != 0:  # Only include classes with differences
#             statements.append(
#                 f"{class_name.replace('_', ' ').title()} has been {change_type} from {percentage1:.2f}% to {percentage2:.2f}% "
#                 f"({abs(difference):.2f}% change)."
#             )

#     # Display images side by side
#     img1 = cv2.imread(image_path1)
#     img2 = cv2.imread(image_path2)

#     if img1 is None or img2 is None:
#         print("Error: One or both images failed to load properly.")
#         return None

#     # Resize images to the same height for better comparison
#     height = min(img1.shape[0], img2.shape[0])
#     img1 = cv2.resize(img1, (int(img1.shape[1] * height / img1.shape[0]), height))
#     img2 = cv2.resize(img2, (int(img2.shape[1] * height / img2.shape[0]), height))

#     # Combine images side by side
#     combined_image = np.hstack((img1, img2))

#     # print("Varitions of the Areas through Satellite Images")
#     cv2_imshow(combined_image)

#     # Generate textual change summary
#     print("Change Summary:\n")
#     for statement in statements:
#         print(statement)

#     return df_comparison


model = load_model('/content/drive/MyDrive/Satellite Dataset/eighty.keras')

import cv2
import numpy as np
import pandas as pd
from google.colab.patches import cv2_imshow

def detect_class_color_portions(image_path):
    """Detects portions of land classes in an image based on RGB colors and calculates their percentage in the image."""
    class_colors = {
        "urban_land": [0, 255, 255],
        "agriculture_land": [255, 255, 0],
        "rangeland": [255, 0, 255],
        "forest_land": [0, 255, 0],
        "water": [0, 0, 255],
        "barren_land": [255, 255, 255],
        "Unknown": [0, 0, 0]
    }

    image = cv2.imread(image_path)
    if image is None:
        print(f"Error: Image {image_path} not loaded properly.")
        return None

    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    color_ranges = {
        "urban_land": [np.array([20, 100, 100]), np.array([40, 255, 255])],
        "agriculture_land": [np.array([80, 100, 100]), np.array([100, 255, 255])],
        "rangeland": [np.array([140, 100, 100]), np.array([170, 255, 255])],
        "forest_land": [np.array([36, 50, 50]), np.array([86, 255, 255])],
        "water": [np.array([100, 50, 50]), np.array([130, 255, 255])],
        "barren_land": [np.array([0, 0, 255]), np.array([180, 255, 255])],
        "unknown": [np.array([0, 0, 0]), np.array([180, 255, 50])]
    }

    total_pixels = image.shape[0] * image.shape[1]

    class_percentages = {}
    for class_name, (lower, upper) in color_ranges.items():
        mask = cv2.inRange(hsv, lower, upper)
        class_pixels = cv2.countNonZero(mask)
        class_percentage = (class_pixels / total_pixels) * 100
        class_percentages[class_name] = class_percentage

    return pd.DataFrame.from_dict(class_percentages, orient='index', columns=['Percentage'])

def compare_images_with_display(image_path1, image_path2):
    df1 = detect_class_color_portions(image_path1)
    df2 = detect_class_color_portions(image_path2)

    if df1 is None or df2 is None:
        print("Error: One or both images failed to load or process.")
        return None

    # Compute the difference as image2 - image1 (to get correct change direction)
    df_comparison = pd.merge(df1, df2, left_index=True, right_index=True, suffixes=('_image1', '_image2'))
    df_comparison['Difference'] = df_comparison['Percentage_image2'] - df_comparison['Percentage_image1']

    # Generate the textual statements
    statements = []
    for class_name, row in df_comparison.iterrows():
        percentage1 = row['Percentage_image1']
        percentage2 = row['Percentage_image2']
        difference = row['Difference']

        # Determine change type based on the actual difference
        change_type = "increased" if difference > 0 else "reduced"

        if abs(difference) > 0.01:  # Only include classes with significant changes
            statements.append(
                f"{class_name.replace('_', ' ').title()} has been {change_type} from {percentage1:.2f}% to {percentage2:.2f}% "
                f"({abs(difference):.2f}% change)."
            )

    # Display images side by side
    img1 = cv2.imread(image_path1)
    img2 = cv2.imread(image_path2)

    if img1 is None or img2 is None:
        print("Error: One or both images failed to load properly.")
        return None

    # Resize images to the same height for better comparison
    height = min(img1.shape[0], img2.shape[0])
    img1 = cv2.resize(img1, (int(img1.shape[1] * height / img1.shape[0]), height))
    img2 = cv2.resize(img2, (int(img2.shape[1] * height / img2.shape[0]), height))

    # Combine images side by side
    combined_image = np.hstack((img1, img2))

    cv2_imshow(combined_image)

    # Generate textual change summary
    print("Change Summary:\n")
    for statement in statements:
        print(statement)

    return df_comparison

In [None]:
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import os

def predict_for_image(image_path, model, mask,  target_shape=(256, 256), save_dir='/content/drive/MyDrive/Satellite Dataset/Predictions'):
    image = plt.imread(image_path)
    image_resized = cv.resize(image, target_shape)
    predicted_mask = model.predict(np.expand_dims(image_resized, axis=0))[0]
    save_prediction(image_resized, predicted_mask, save_dir, mask)

    plot_predictions(image_resized, predicted_mask)

def save_prediction(image, mask_pred, save_dir, mask):
    os.makedirs(save_dir, exist_ok=True)

    # Always save the mask as 'mask1.png'
    mask_filename = os.path.join(save_dir, mask)

    # Normalize mask to 0-255 and convert to uint8
    mask_pred_normalized = (mask_pred * 255).astype(np.uint8)

    # Save the mask image
    cv.imwrite(mask_filename, mask_pred_normalized)
    print(f"Predicted mask saved at {mask_filename}")

def plot_predictions(image, mask_pred):
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))

    ax[0].imshow(image)
    ax[0].set_title('Original Satellite Image')
    ax[0].axis('off')

    ax[1].imshow(mask_pred, cmap='gray')
    ax[1].set_title('Predicted Mask')
    ax[1].axis('off')

    plt.show()

# Example usage
image_path1 = '/content/259219_sat.jpg'
image_path2 = '/content/323588_sat.jpg'

predict_for_image(image_path1, model, 'mask1.png')
predict_for_image(image_path2, model, 'mask2.png')

In [None]:
# Compare the two images
image_path1 = "/content/drive/MyDrive/Satellite Dataset/Predictions/mask1.png"
image_path2 = "/content/drive/MyDrive/Satellite Dataset/Predictions/mask2.png"

class_diff_df = compare_images_with_display(image_path1, image_path2)

if class_diff_df is not None:
    print("\nAreas Difference Over the Period:")
    print(class_diff_df[['Difference', 'Percentage_image1', 'Percentage_image2']])

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import load_model
from google.colab import drive
from google.colab.patches import cv2_imshow
import re

class SatelliteImageAnalyzer:
    def __init__(self, model_path, predictions_dir='/content/drive/MyDrive/Satellite Dataset/Predictions'):
        """
        Initialize the Satellite Image Analyzer

        Args:
            model_path (str): Path to the trained U-Net model
            predictions_dir (str): Directory to save prediction masks
        """
        # Mount Google Drive
        drive.mount('/content/drive', force_remount=True)

        # Load the pre-trained model
        self.model = load_model(model_path)

        # Set prediction directory
        self.predictions_dir = predictions_dir
        os.makedirs(self.predictions_dir, exist_ok=True)

        # Define color ranges for land classification
        self.color_ranges = {
            "urban_land": [np.array([20, 100, 100]), np.array([40, 255, 255])],
            "agriculture_land": [np.array([80, 100, 100]), np.array([100, 255, 255])],
            "rangeland": [np.array([140, 100, 100]), np.array([170, 255, 255])],
            "forest_land": [np.array([36, 50, 50]), np.array([86, 255, 255])],
            "water": [np.array([100, 50, 50]), np.array([130, 255, 255])],
            "barren_land": [np.array([0, 0, 255]), np.array([180, 255, 255])],
            "unknown": [np.array([0, 0, 0]), np.array([180, 255, 50])]
        }

    def predict_mask(self, image_path, mask_name, target_shape=(256, 256)):
        """
        Predict mask for a given satellite image

        Args:
            image_path (str): Path to the satellite image
            mask_name (str): Name of the mask file to save
            target_shape (tuple): Resize dimensions for the image

        Returns:
            numpy.ndarray: Predicted mask
        """
        # Read and resize image
        image = plt.imread(image_path)
        image_resized = cv2.resize(image, target_shape)

        # Predict mask
        predicted_mask = self.model.predict(np.expand_dims(image_resized, axis=0))[0]

        # Save prediction
        mask_filename = os.path.join(self.predictions_dir, mask_name)
        mask_pred_normalized = (predicted_mask * 255).astype(np.uint8)
        cv2.imwrite(mask_filename, mask_pred_normalized)

        return predicted_mask

    def detect_class_color_portions(self, image_path):
        """
        Detect land class portions in an image

        Args:
            image_path (str): Path to the image

        Returns:
            pandas.DataFrame: Percentage of each land class
        """
        image = cv2.imread(image_path)
        if image is None:
            print(f"Error: Image {image_path} not loaded properly.")
            return None

        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        total_pixels = image.shape[0] * image.shape[1]

        class_percentages = {}
        for class_name, (lower, upper) in self.color_ranges.items():
            mask = cv2.inRange(hsv, lower, upper)
            class_pixels = cv2.countNonZero(mask)
            class_percentage = (class_pixels / total_pixels) * 100
            class_percentages[class_name] = class_percentage

        return pd.DataFrame.from_dict(class_percentages, orient='index', columns=['Percentage'])

    def compare_images(self, image_path1, image_path2):
        """
        Compare two satellite images and generate change summary

        Args:
            image_path1 (str): Path to first image
            image_path2 (str): Path to second image

        Returns:
            dict: Detailed change analysis
        """
        # Load color percentages
        df1 = self.detect_class_color_portions(image_path1)
        df2 = self.detect_class_color_portions(image_path2)

        if df1 is None or df2 is None:
            print("Error: Image analysis failed.")
            return None

        # Compute differences
        df_comparison = pd.merge(df1, df2, left_index=True, right_index=True,
                                 suffixes=('_image1', '_image2'))
        df_comparison['Difference'] = df_comparison['Percentage_image2'] - df_comparison['Percentage_image1']

        # Generate change statements
        change_statements = []
        for class_name, row in df_comparison.iterrows():
            percentage1 = row['Percentage_image1']
            percentage2 = row['Percentage_image2']
            difference = row['Difference']

            if abs(difference) > 0.01:
                change_type = "increased" if difference > 0 else "reduced"
                statement = (
                    f"{class_name.replace('_', ' ').title()} has been {change_type} "
                    f"from {percentage1:.2f}% to {percentage2:.2f}% "
                    f"({abs(difference):.2f}% change)."
                )
                change_statements.append(statement)

        # Visualize images
        img1 = cv2.imread(image_path1)
        img2 = cv2.imread(image_path2)

        if img1 is not None and img2 is not None:
            height = min(img1.shape[0], img2.shape[0])
            img1_resized = cv2.resize(img1, (int(img1.shape[1] * height / img1.shape[0]), height))
            img2_resized = cv2.resize(img2, (int(img2.shape[1] * height / img2.shape[0]), height))
            combined_image = np.hstack((img1_resized, img2_resized))
            cv2_imshow(combined_image)

        # Prepare areas difference table
        areas_difference = df_comparison[['Difference', 'Percentage_image1', 'Percentage_image2']]
        areas_difference.columns = ['Difference', 'Percentage Period 1', 'Percentage Period 2']

        return {
            'change_statements': change_statements,
            'detailed_comparison': df_comparison,
            'areas_difference': areas_difference
        }

    def generate_environmental_insights(self, change_analysis):
        """
        Generate advanced environmental insights based on land use changes

        Args:
            change_analysis (dict): Change analysis from compare_images method

        Returns:
            dict: Comprehensive environmental insights
        """
        if not change_analysis or not change_analysis['change_statements']:
            return {"summary": "No significant changes detected."}

        insights = {
            "summary": "",
            "detailed_insights": {}
        }

        # Areas Difference Table
        areas_difference = change_analysis['areas_difference']
        insights['areas_difference'] = areas_difference.to_string()

        for statement in change_analysis['change_statements']:
            # Use regex to extract key information
            match = re.match(r'(\w+ \w+) has been (\w+) from (\d+\.\d+)% to (\d+\.\d+)% \((\d+\.\d+)% change\).', statement)

            if match:
                class_name = match.group(1).lower().replace(' ', '_')
                change_type = match.group(2)
                percentage_change = float(match.group(5))

                # Advanced AI-driven insights generation
                if 'urban' in class_name:
                    insights['detailed_insights'][class_name] = self._generate_urban_insights(
                        change_type, percentage_change
                    )

                elif 'agriculture' in class_name:
                    insights['detailed_insights'][class_name] = self._generate_agriculture_insights(
                        change_type, percentage_change
                    )

                elif 'rangeland' in class_name:
                    insights['detailed_insights'][class_name] = self._generate_rangeland_insights(
                        change_type, percentage_change
                    )

                elif 'forest' in class_name:
                    insights['detailed_insights'][class_name] = self._generate_forest_insights(
                        change_type, percentage_change
                    )

                elif 'water' in class_name:
                    insights['detailed_insights'][class_name] = self._generate_water_insights(
                        change_type, percentage_change
                    )

        # Combine change statements into summary
        insights['summary'] = " ".join(change_analysis['change_statements'])

        return insights

    def _generate_urban_insights(self, change_type, percentage_change):
        """AI-driven urban land change insights"""
        base_insights = [
            "Urban land transformations can signal profound socio-economic shifts.",
            "Changes in urban landscapes reflect complex interactions between human development and environmental dynamics."
        ]

        if change_type == 'reduced':
            adaptive_insights = [
                f"Significant urban land reduction ({percentage_change:.2f}%) suggests potential de-urbanization trends.",
                "Possible factors include: economic decentralization, population migration, or environmental conservation efforts.",
                "This reduction might indicate a shift towards more sustainable, distributed settlement patterns."
            ]
        else:
            adaptive_insights = [
                f"Urban expansion of {percentage_change:.2f}% indicates accelerated urban development.",
                "Potential implications include increased infrastructure demands, economic growth, and potential environmental pressures.",
                "Recommend comprehensive urban planning to mitigate ecological footprint."
            ]

        return "\n".join(base_insights + adaptive_insights)

    def _generate_agriculture_insights(self, change_type, percentage_change):
        """AI-driven agriculture land change insights"""
        base_insights = [
            "Agricultural land transformations are critical indicators of regional ecological and economic health.",
            "Land use changes reflect complex interactions between farming practices, climate, and socio-economic factors."
        ]

        if change_type == 'increased':
            adaptive_insights = [
                f"Substantial agricultural land increase ({percentage_change:.2f}%) suggests significant land-use reconfiguration.",
                "Potential drivers: food security initiatives, climate adaptation strategies, or agricultural policy changes.",
                "Recommend evaluating sustainability of expanded agricultural practices and potential ecosystem impacts."
            ]
        else:
            adaptive_insights = [
                f"Agricultural land reduction of {percentage_change:.2f}% might indicate challenging agricultural conditions.",
                "Possible factors: urbanization, land degradation, or shifts in agricultural productivity.",
                "Urgent need for sustainable land management and diversification strategies."
            ]

        return "\n".join(base_insights + adaptive_insights)

    def _generate_rangeland_insights(self, change_type, percentage_change):
        """AI-driven rangeland change insights"""
        base_insights = [
            "Rangeland transformations are sensitive indicators of ecological resilience and land-use dynamics.",
            "Changes reflect complex interactions between livestock management, climate conditions, and ecosystem health."
        ]

        if change_type == 'reduced':
            adaptive_insights = [
                f"Rangeland reduction of {percentage_change:.2f}% signals potential ecosystem stress.",
                "Possible causes: overgrazing, climate change, land-use conversion, or habitat fragmentation.",
                "Recommend comprehensive ecosystem assessment and adaptive management strategies."
            ]
        else:
            adaptive_insights = [
                f"Slight rangeland increase of {percentage_change:.2f}% might indicate ecosystem recovery efforts.",
                "Potential signals: improved land management, conservation initiatives, or climate adaptation.",
                "Continued monitoring crucial for understanding long-term ecological trends."
            ]

        return "\n".join(base_insights + adaptive_insights)

    def _generate_forest_insights(self, change_type, percentage_change):
        """AI-driven forest land change insights"""
        base_insights = [
            "Forest land transformations are critical indicators of global ecological health.",
            "Changes reflect complex interactions between human activities, climate dynamics, and ecosystem resilience."
        ]

        if change_type == 'reduced':
            adaptive_insights = [
                f"Dramatic forest land reduction of {percentage_change:.2f}% represents significant ecological concern.",
                "Potential implications: biodiversity loss, carbon sequestration disruption, climate change acceleration.",
                "Urgent need for comprehensive reforestation and conservation strategies."
            ]
        else:
            adaptive_insights = [
                f"Forest land changes of {percentage_change:.2f}% might indicate restoration efforts.",
                "Potential signals: successful conservation programs, changing land-use policies, or natural regeneration.",
                "Recommend continued monitoring and supporting forest ecosystem recovery."
            ]

        return "\n".join(base_insights + adaptive_insights)

    def _generate_water_insights(self, change_type, percentage_change):
        """AI-driven water area change insights"""
        base_insights = [
            "Water area transformations are crucial indicators of hydrological and ecological system dynamics.",
            "Changes reflect complex interactions between climate, human activities, and environmental processes."
        ]

        if change_type == 'reduced':
            adaptive_insights = [
                f"Water area reduction of {percentage_change:.2f}% suggests significant hydrological changes.",
                "Possible factors: climate change, water resource management, agricultural expansion, or ecosystem shifts.",
                "Urgent need for comprehensive watershed management and conservation strategies."
            ]
        else:
            adaptive_insights = [
                f"Water area stabilization or increase of {percentage_change:.2f}% might indicate positive environmental trends.",
                "Potential signals: improved water conservation, ecosystem restoration, or climate adaptation.",
                "Continued monitoring essential for understanding water resource dynamics."
            ]

        return "\n".join(base_insights + adaptive_insights)

def main():
    # Set paths
    model_path = '/content/drive/MyDrive/Satellite Dataset/eighty.keras'
    image_path1 = '/content/635741_sat.jpg'
    image_path2 = '/content/89743_sat.jpg'

    # Initialize analyzer
    analyzer = SatelliteImageAnalyzer(model_path)

    # Predict masks for both images
    mask1 = analyzer.predict_mask(image_path1, 'mask1.png')
    mask2 = analyzer.predict_mask(image_path2, 'mask2.png')

    # Compare images and generate change analysis
    change_analysis = analyzer.compare_images(
        f'/content/drive/MyDrive/Satellite Dataset/Predictions/mask1.png',
        f'/content/drive/MyDrive/Satellite Dataset/Predictions/mask2.png'
    )

    # Generate and print environmental insights
    if change_analysis:
        # Print Change Summary in multiple lines
        print("Change Summary:")
        for statement in change_analysis['change_statements']:
            print(statement)

        # Print Areas Difference
        print("\n--- Areas Difference Over the Period ---")
        print(change_analysis['areas_difference'])

        # Generate and print advanced environmental insights
        insights = analyzer.generate_environmental_insights(change_analysis)

        # Print Detailed Insights, including a check for water insights
        print("\n--- Environmental Insights ---")
        for category, insight in insights['detailed_insights'].items():
            print(f"\n### {category.replace('_', ' ').title()} Land Transformation")
            print(insight)

        # If no water insights were generated, specifically explain why
        if not any('water' in category.lower() for category in insights['detailed_insights']):
            print("\n### Water Land Transformation")
            print("No significant water area changes detected in this analysis.")

# Run the main function
if __name__ == '__main__':
    main()