# UAV Developable Zone Clustering: Processing Data

This notebook is made to do processing for UAV data that collected from public source. The process will include:
---

1.   Data Collection
2.   Preprocessing Data
3.   Feature Extraction
4.   Modeling
5.   Implementation
---

The goals for this process is to cluster data based on:
---

1.   Developable area for plants
2.   Developable area for constructions
3.   Developable area for both plants and constrcutions
4.   Non-developable area (already balanced)

**this notebook is made in Google Colab while intergrated with Github, it will be better to run this notebook on the same space.*

## Required Library

In [None]:
!pip install matplotlib numpy Pillow opencv-python scikit-image scikit-learn joblib

In [None]:
import os
import shutil
import numpy as np
import matplotlib.pyplot as plt
import cv2
import joblib
from PIL import Image
from skimage import color, filters, measure, morphology
from skimage.feature import canny
from skimage.color import rgb2gray
from skimage.morphology import closing, square
from mpl_toolkits.mplot3d import Axes3D
from sklearn.cluster import KMeans, DBSCAN
from collections import Counter

## Data Collection

Data that will be used comes from kaggle, the dataset is available to download at https://www.kaggle.com/datasets/ankit1743/skyview-an-aerial-landscape-dataset (also available in this repo as Aerial_Landscapes.zip).

### Download and Unzip

In [None]:
# Download dataset from github repo
!wget https://github.com/RML1812/uav-developable-zone-clustering/raw/refs/heads/main/Aerial_Landscapes.zip

In [None]:
# Unzip to /data folder
!unzip /content/Aerial_Landscapes.zip -d /content/data/

In [None]:
# Read directory of data and total image in each folder
data_dir = '/content/data'

for folder in os.listdir(data_dir):
  folder_path = os.path.join(data_dir, folder)
  if os.path.isdir(folder_path):
    image_count = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
    print(f"Folder: {folder}, Total Images: {image_count}")

## Data Preprocessing

### Drop Data

Dropping data that's unnecessary or uncompatible against purpose of the clustering, which are:

*   **Terrain compatibility**: Desert
*   **Scape purpose**: Grassland, Forest
*   **Functional Structure**: Port, Airport, Agriculture, Railway, Highway


In [None]:
# Delete Desert, Grassland, Forest, Port, Airport, Agriculture, Railway, Highway folder form content/data folder
folders_to_delete = ['Desert', 'Grassland', 'Forest', 'Port', 'Airport', 'Agriculture', 'Railway', 'Highway']

for folder in folders_to_delete:
  folder_path = os.path.join(data_dir, folder)
  if os.path.exists(folder_path):
    shutil.rmtree(folder_path)
    print(f"Deleted folder: {folder_path}")
  else:
    print(f"Folder not found: {folder_path}")

In [None]:
# Read remaining data
for folder in os.listdir(data_dir):
  folder_path = os.path.join(data_dir, folder)
  if os.path.isdir(folder_path):
    image_count = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
    print(f"Folder: {folder}, Total Images: {image_count}")

### Cleaning and Normalize Images

The dataset source says that the data is already clean. Hence, to ensure it's right, this process will check every image.

---
The **properties** are,
* Dimensions: 256x256
* Horizontal resolution: 96 dpi
* Vertical resolution: 96 dpi
* Bit depth: 24


In [None]:
# Define properties
EXPECTED_DIMENSIONS = (256, 256)
EXPECTED_HORIZONTAL_RESOLUTION = 96
EXPECTED_VERTICAL_RESOLUTION = 96

In [None]:
# Function to check every images and print check result
def check_image_properties(image_path):
  try:
    with Image.open(image_path) as img:
      # Check dimensions
      if img.size != EXPECTED_DIMENSIONS:
        return False

      # Check resolution (DPI)
      dpi = img.info.get('dpi', (0, 0))
      if dpi != (0, 0):  # Only check DPI if it's present
        if dpi[0] != EXPECTED_HORIZONTAL_RESOLUTION or dpi[1] != EXPECTED_VERTICAL_RESOLUTION:
          return False

      # Check bit depth (RGB = 24-bit)
      if img.mode != 'RGB':
        return False

    return True

  except Exception as e:
    print(f"Error checking image {image_path}: {e}")
    return False

def check_images_in_folder(folder_path):
  different_properties_count = 0
  total_images = 0

  for root, dirs, files in os.walk(folder_path):
    for file in files:
      if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff')):
        image_path = os.path.join(root, file)
        total_images += 1
        if not check_image_properties(image_path):
          different_properties_count += 1

  if different_properties_count > 0:
    print(f"Total images with different properties: {different_properties_count}")
  else:
    print("All images have the correct properties.")

  print(f"Total images checked: {total_images}")

In [None]:
check_images_in_folder(data_dir)

Since all of the image is correct, any other normalizing/cleaning process won't be needed.

### Store Data

In [None]:
# Convert images to numpy arrays along with labels
def convert_to_numpy_with_labels(data_dir):
  images = []
  labels = []
  label_dict = {}
  label_counter = 0

  for folder_name in os.listdir(data_dir):
    folder_path = os.path.join(data_dir, folder_name)
    if os.path.isdir(folder_path):
      # Assign a unique integer label for each folder (class name)
      if folder_name not in label_dict:
        label_dict[folder_name] = label_counter
        label_counter += 1

      # Process each image file in the folder
      for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
          img_path = os.path.join(folder_path, filename)
          try:
            # Load image and convert to numpy array
            img = Image.open(img_path).convert("RGB")
            img_array = np.array(img)

            # Append the image data and its corresponding label
            images.append(img_array)
            labels.append(label_dict[folder_name])
          except Exception as e:
            print(f"Error processing image {img_path}: {e}")

  # Convert lists to numpy arrays
  images_np = np.array(images)
  labels_np = np.array(labels)

  return images_np, labels_np, label_dict

In [None]:
# Run convert function
images, labels, label_dict = convert_to_numpy_with_labels(data_dir)
print(f"Total images: {len(images)}")
print(f"Label dictionary: {label_dict}")

In [None]:
# Check detail of images
def print_total_images_per_label(labels_np, label_dict):
    # Get unique labels and their counts
    unique_labels, counts = np.unique(labels_np, return_counts=True)

    # Invert the label_dict to get class names from integer labels
    inverted_label_dict = {v: k for k, v in label_dict.items()}

    print("Total images per label:")
    for label, count in zip(unique_labels, counts):
        class_name = inverted_label_dict[label]
        print(f"Label '{class_name}' (ID {label}): {count} images")

print_total_images_per_label(labels, label_dict)

## Feature Extraction

Image's characteristic that will be considered are:

* Plants Area (color)
* Construction Area (edges)
* Vacant Area (remaining area)

The feature take shapes in percentage of each characteristic above.

### Extraction Process

In [None]:
# Function for extraction
def extract_features(image):
  # Convert to HSV for plants detection
  hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

  # Define range for detecting green (plants) in HSV
  lower_green = np.array([36, 25, 25])
  upper_green = np.array([70, 255, 255])
  mask_plants = cv2.inRange(hsv_image, lower_green, upper_green)

  # Refine the mask using morphological closing to remove noise
  mask_plants = morphology.closing(mask_plants, square(7))

  # Construction detection Canny edge detection
  gray_image = rgb2gray(image)
  edges = canny(gray_image, sigma=1)

  # Close the edges to make shapes continuous and fill in gaps
  edges_closed = morphology.binary_closing(edges, square(5))

  # Label connected regions (contours) in the construction mask
  labeled_construction = measure.label(edges_closed)

  # Filter out small regions based on area (keep only significant regions)
  props = measure.regionprops(labeled_construction)
  mask_construction = np.zeros_like(labeled_construction)

  for prop in props:
    if prop.area > 500:
      mask_construction[labeled_construction == prop.label] = 255

  # Vacant space: regions that are neither construction nor plants
  mask_vacant = np.logical_not(np.logical_or(mask_plants, mask_construction))

  # Label vacant space regions
  labeled_vacant = measure.label(mask_vacant)
  props_vacant = measure.regionprops(labeled_vacant)

  # Filter out small vacant areas based on area
  mask_large_vacant = np.zeros_like(mask_vacant, dtype=np.uint8)
  for prop in props_vacant:
    if prop.area > 500:  # Adjust the area threshold for vacant spaces
      mask_large_vacant[labeled_vacant == prop.label] = 255

  # Calculate percentages of each feature
  total_pixels = image.shape[0] * image.shape[1]
  percent_plants = np.sum(mask_plants > 0) / total_pixels * 100
  percent_construction = np.sum(mask_construction > 0) / total_pixels * 100
  percent_vacant = np.sum(mask_large_vacant > 0) / total_pixels * 100

  # Return the feature vector
  return [percent_vacant, percent_plants, percent_construction]

In [None]:
# Run extraction for all images
features = []
for image in images:
  features.append(extract_features(image))

features = np.array(features)

### Feature Visualization

In [None]:
# Function for feature visualization
def visualize_features(image, label, label_name):
  # Extract the masks using the same method as in feature extraction
  hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
  lower_green = np.array([30, 40, 40])
  upper_green = np.array([90, 255, 255])
  mask_plants = cv2.inRange(hsv_image, lower_green, upper_green)
  mask_plants = morphology.closing(mask_plants, square(7))

  gray_image = rgb2gray(image)
  edges = canny(gray_image, sigma=1)
  edges_closed = morphology.binary_closing(edges, square(5))

  labeled_construction = measure.label(edges_closed)
  props = measure.regionprops(labeled_construction)
  mask_construction = np.zeros_like(labeled_construction)
  for prop in props:
    if prop.area > 500:
      mask_construction[labeled_construction == prop.label] = 255

  mask_vacant = np.logical_not(np.logical_or(mask_plants, mask_construction))
  labeled_vacant = measure.label(mask_vacant)
  props_vacant = measure.regionprops(labeled_vacant)
  mask_large_vacant = np.zeros_like(mask_vacant, dtype=np.uint8)
  for prop in props_vacant:
    if prop.area > 500:
      mask_large_vacant[labeled_vacant == prop.label] = 255

  # Display the original image and masks
  plt.figure(figsize=(12, 6))

  plt.subplot(1, 4, 1)
  plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
  plt.title(f"Original Image: {label_name}")

  plt.subplot(1, 4, 2)
  plt.imshow(mask_plants, cmap='Greens')
  plt.title("Plants Mask")

  plt.subplot(1, 4, 3)
  plt.imshow(mask_construction, cmap='gray')
  plt.title("Construction Mask")

  plt.subplot(1, 4, 4)
  plt.imshow(mask_large_vacant, cmap='Blues')
  plt.title("Vacant Space Mask")

  plt.show()

In [None]:
# Run visualization
for label_name, label_idx in label_dict.items():
  image_indices = np.where(labels == label_idx)[0]

  if len(image_indices) > 0:
    image_idx = image_indices[0]
    image = images[image_idx]
    visualize_features(image, label_idx, label_name)
  else:
    print(f"No images found for label '{label_name}' (index {label_idx}).")

## Clustering

### KMeans

In [None]:
# KMeans Clustering function with 3D Plot and original labels coloring
def cluster_kmeans_3d(features, labels, label_dict, n_clusters=4):
    # Fit the KMeans model
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans = kmeans.fit_predict(features)

    # Prepare the 3D scatter plot
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')

    # Use the original image labels for coloring
    unique_labels = np.unique(labels)
    colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))

    # Get the keys (label names) that correspond to the values in labels
    for i, label in enumerate(unique_labels):
        idx = np.where(labels == label)

        # Find the label name by looking up the value in label_dict
        label_name = [key for key, val in label_dict.items() if val == label]

        if label_name:  # If the label name exists
            ax.scatter(features[idx, 0], features[idx, 1], features[idx, 2], color=colors[i], label=label_name[0])
        else:
            print(f"Warning: Label {label} not found in label_dict")

    ax.set_xlabel('Percent Vacant')
    ax.set_ylabel('Percent Plants')
    ax.set_zlabel('Percent Construction')
    ax.set_title('KMeans Clustering (3D)')
    ax.legend()
    plt.show()

    # Print the number of labels in each cluster
    count_labels = Counter(kmeans)
    print("KMeans Cluster Counts:")
    for cluster, count in count_labels.items():
        print(f"Cluster {cluster}: {count} images")

    return kmeans

In [None]:
# Run KMeans and visualize
kmeans = cluster_kmeans_3d(features, labels, label_dict)

## Export Model

In [None]:
joblib.dump(kmeans, '/content/kmeans_model.pkl')
print("KMeans model saved as: kmeans_model.pkl")