This notebook was executed on Kaggle. Paths and outputs remain as in the original Kaggle notebook due to computational limitations. You can view the original notebook here: https://www.kaggle.com/code/mrhendley/solar-panels-segmentation-u-net-efficientnet-512


# I. Import libraries and GPU configuration
___

In [None]:
!pip install segmentation-models

In [None]:
import numpy as np 
import pandas as pd 
import shutil
import tensorflow as tf
from zipfile import ZipFile 
import keras.backend as K
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
os.environ['SM_FRAMEWORK'] = 'tf.keras'
import segmentation_models as sm
from PIL import Image

In [None]:
print("--- HARDWARE VERIFICATION ---")

# 1. CHECK FOR AVAILABLE GPUS
# Lists the physical devices available for TensorFlow.
gpus = tf.config.list_physical_devices('GPU')

if gpus:
    try:
        # Configure TensorFlow to use the detected GPU (assuming one is present).
        gpu_device = gpus[0]
        tf.config.set_visible_devices(gpu_device, 'GPU')

        # 2. CONFIGURE MEMORY GROWTH
        # Enables gradual memory growth . 
        # This is vital for large images (512x512) as it reserves GPU memory only as needed,
        # preventing "Out of Memory" (OOM) errors.
        tf.config.experimental.set_memory_growth(gpu_device, True)
        
        print(f"Success: GPU detected and configured: {gpu_device.name}")
        print("Training will use the graphics accelerator.")
        
    except RuntimeError as e:
        # Error handling if configuration fails (e.g., if executed too late).
        print(f"Error configuring GPU: {e}")
        print("Ensure this cell runs before building any model.")
else:
    print("Warning: No GPU detected. Training will run on the CPU and will be slow.")

# 3. VERIFY STATE
# Displays the devices TensorFlow is actively utilizing.
print("\n--- CONFIGURATION STATUS ---")
print(f"Visible devices for TF: {tf.config.get_visible_devices()}")

# II. Utility Functions
---

In [None]:
def prepare_dataframe(image_path, name):
    solar_ids = []
    paths = []
    for dirname, _, filenames in os.walk(image_path):
        for filename in filenames:
            path = os.path.join(dirname, filename)    
            paths.append(path)

            solar_id = filename.split(".")[0]
            solar_ids.append(solar_id)

    d = {"id": solar_ids, name: paths}
    df = pd.DataFrame(data = d)
    df = df.set_index('id')
    return df

In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

# III. Dataset Preparation
___

In [None]:
!mkdir train
!mkdir train_masks

In [None]:
#copy all images to image folder and save labels to label folder with same name as correspoding image

root_dir = '/kaggle/input/solar-panel-detection-and-identification/PV03'
resolution = 'PV03'
data_dir = os.path.join(root_dir)#,resolution)

image_root = '/kaggle/working/train'
label_root = '/kaggle/working/train_masks'
if not os.path.isdir(image_root):
    os.mkdir(image_root)
if not os.path.isdir(label_root):
    os.mkdir(label_root)

images = list()
labels = list()

for (dirpath, dirnames, filenames) in os.walk(data_dir):
    # img_names += [os.path.join(dirpath, file) for file in filenames]
    images += [os.path.join(dirpath, file) for file in filenames]

labels += [i for i in filter(lambda score: '_label.bmp' in score, images)]
images = [i for i in filter(lambda score: '_label.bmp' not in score, images)]

for img_path in images:
    img = Image.open(img_path).convert("RGB")
    img = img.resize((512, 512), resample=Image.BILINEAR)
    dst = os.path.join(image_root, os.path.basename(img_path).replace('.bmp', '.png'))
    img.save(dst, 'PNG')

for label_path in labels:
    img = Image.open(label_path)

    # Forzar monocanal
    img = img.convert("L")

    # Resize SIN interpolar
    img = img.resize((512, 512), resample=Image.NEAREST)

    # Binarizar explícitamente
    img = np.array(img)
    img = (img > 0).astype(np.uint8) * 255
    img = Image.fromarray(img)

    file_name = os.path.basename(label_path).replace('_label.bmp', '.png')
    dst = os.path.join(label_root, file_name)
    img.save(dst, 'PNG')

In [None]:
print("Train set:  ", len(os.listdir("/kaggle/working/train")))
print("Train masks:", len(os.listdir("/kaggle/working/train_masks")))

In [None]:
df = prepare_dataframe('/kaggle/working/train', "solar_path")
mask_df = prepare_dataframe('/kaggle/working/train_masks', "mask_path")
df["mask_path"] = mask_df["mask_path"]

# IV. Data Pre-processing and Augmentation
___

Now we will perform a simple augmentation of flipping an image and then normalize the image pixel in between 0 and 1

In [None]:
# Definition of the output image size. All images will be resized to 512x512.
img_size = [512, 512] 

def data_augmentation(solar_img, mask_img):

    if tf.random.uniform(()) > 0.5:
        solar_img = tf.image.flip_left_right(solar_img)
        mask_img = tf.image.flip_left_right(mask_img)

    if tf.random.uniform(()) > 0.5:
        solar_img = tf.image.flip_up_down(solar_img)
        mask_img = tf.image.flip_up_down(mask_img)

    k = tf.random.uniform((), minval=0, maxval=4, dtype=tf.int32)
    solar_img = tf.image.rot90(solar_img, k)
    mask_img = tf.image.rot90(mask_img, k)

    solar_img = tf.image.random_brightness(solar_img, 0.1)
    solar_img = tf.image.random_contrast(solar_img, 0.9, 1.1)

    return solar_img, mask_img

def preprocessing(solar_path, mask_path):

    # --- IMAGE ---
    solar_img = tf.io.read_file(solar_path)
    solar_img = tf.image.decode_png(solar_img, channels=3)
    solar_img = tf.image.resize(solar_img, img_size)
    solar_img = tf.cast(solar_img, tf.float32) / 255.0

    # --- MASK ---
    mask_img = tf.io.read_file(mask_path)
    mask_img = tf.image.decode_png(mask_img, channels=1)

    mask_img = tf.image.resize(
        mask_img,
        img_size,
        method=tf.image.ResizeMethod.NEAREST_NEIGHBOR
    )

    mask_img = tf.cast(mask_img > 0, tf.float32)

    return solar_img, mask_img


# Assuming df_train has ~4616 rows (this number is used for the shuffle buffer)
DATASET_SIZE = 2308

def create_dataset(df, train=False):

    ds = tf.data.Dataset.from_tensor_slices(
        (df["solar_path"].values, df["mask_path"].values)
    )

    ds = ds.map(preprocessing, num_parallel_calls=tf.data.AUTOTUNE)

    if train:
        ds = ds.map(data_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
        ds = ds.shuffle(DATASET_SIZE)
    else:
        ds = ds.cache()

    ds = ds.prefetch(tf.data.AUTOTUNE)

    return ds


In [None]:
# Now we will split the dataset into train and test
train_df, valid_df = train_test_split(df, random_state=27, test_size=.2)
train = create_dataset(train_df, train = True)
valid = create_dataset(valid_df, train=False)

In [None]:
solar, mask = next(iter(train))
print("Imagen:", solar.shape, solar.dtype, tf.reduce_min(solar).numpy(), tf.reduce_max(solar).numpy())
print("Máscara:", mask.shape, tf.unique(tf.reshape(mask, [-1]))[0].numpy())

In [None]:
TRAIN_LENGTH = len(train_df)
BATCH_SIZE = 4
BUFFER_SIZE = TRAIN_LENGTH

In [None]:
train_dataset = train.batch(BATCH_SIZE).repeat()
valid_dataset = valid.batch(BATCH_SIZE)

In [None]:
# Let's look the image and it's corresponding mask
for i in range(5):
    for image, mask in train.take(i):
        sample_image, sample_mask = image, mask
        display([sample_image, sample_mask])
        print("Images:", image.shape, image.dtype)
        print("Masks:", mask.shape, mask.dtype)

# V. Model
---

We are going to use U-Net model. A U-Net consists of an encoder (downsampler) and decoder (upsampler). In-order to learn robust features, and reduce the number of trainable parameters, a pretrained model can be used as the encoder.The encoder will be a pretrained efficientnetb1 model which is prepared and ready to use in tf.keras.applications. 

In [None]:
# --- GLOBAL PARAMETER CONFIGURATION ---
BACKBONE = 'efficientnetb1' 
INPUT_SHAPE = (512, 512, 3) 
CLASSES = 1 
UNIFIED_LEARNING_RATE = 3e-4 # Low LR for stable fine-tuning
LOCAL_ENCODER_WEIGHTS = '/kaggle/input/efficientnet-keras-weights/imagenet_1000/b2_notop.h5'

# --- LOSS AND METRICS ---

def dice_coef(y_true, y_pred, smooth=1e-6):
    """Calculates the Dice Coefficient for segmentation evaluation."""
    intersection = K.sum(y_true * y_pred, axis=[1,2,3])
    union = K.sum(y_true, axis=[1,2,3]) + K.sum(y_pred, axis=[1,2,3])
    return K.mean( (2. * intersection + smooth) / (union + smooth), axis=0)

# Instantiate BinaryCrossentropy once for efficiency
bce = tf.keras.losses.BinaryCrossentropy()

def combined_loss(y_true, y_pred):
    """Combined loss (50% Dice Loss + 50% Binary Crossentropy) for stability."""
    dice_loss_val = 1 - dice_coef(y_true, y_pred)
    return 0.5 * dice_loss_val + 0.5 * bce(y_true, y_pred)


# --- 2. U-Net with efficientnetb1 Model Definition ---
def build_unet_efficientnetb1(input_shape, classes, backbone):
    """
    Construye el modelo U-Net con el backbone efficientnetb1.
    """
    print(f"Building U-Net model with backbone: {backbone}...")
    
    model = sm.Unet( 
        backbone_name=backbone,
        input_shape=input_shape,
        encoder_weights= LOCAL_ENCODER_WEIGHTS, # Transfer Learning
        classes=classes,
        activation='sigmoid', # Sigmoid for binary output
    )
    
    return model

# --- MODEL INSTANTIATION AND WEIGHT LOADING ---
model_unet_efficientnetb1 = build_unet_efficientnetb1(
    input_shape=INPUT_SHAPE,
    classes=CLASSES,
    backbone=BACKBONE
)

model = model_unet_efficientnetb1
checkpoint_filepath = 'unet_efficientnetb1.h5'

try:
    model.load_weights(checkpoint_filepath)
    print(f"INFO: Weights loaded from {checkpoint_filepath}. Resuming training.")
except:
    print("INFO: No saved weights found. Initializing from scratch.")

print("\n--- U-Net (efficientnetb1) MODEL INSTANTIATED ---")

In [None]:
# --- FINAL COMPILATION (All layers are already trainable) ---

model.compile(
    optimizer=Adam(learning_rate=UNIFIED_LEARNING_RATE),
    loss=combined_loss, # Combined Loss (Dice + BCE)
    metrics=[dice_coef, 'binary_accuracy', sm.metrics.iou_score]
)
print(f"Model compiled with final LR: {UNIFIED_LEARNING_RATE}")
print("--- FINAL COMPILATION COMPLETE ---")

# --- MODEL ARCHITECTURE SUMMARY ---
print("\n--- MODEL ARCHITECTURE SUMMARY (U-Net with efficientnetb1) ---")
model.summary()

for layer in model.layers:
    if 'batch_normalization' in layer.name:
        layer.trainable = False

# VI. Training the Model
___

Let's try out the model to see what it predicts before training.

In [None]:
for images, masks in train_dataset.take(1):
    for img, mask in zip(images, masks):
        sample_image = img
        sample_mask = mask
        break
def visualize(display_list):
    plt.figure(figsize=(15, 15))
    title = ['Input Image', 'True Mask', 'Predicted Mask']
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

def show_predictions(sample_image, sample_mask):
    pred_mask = model.predict(sample_image[tf.newaxis, ...])
    pred_mask = pred_mask.reshape(img_size[0],img_size[1],1)
    visualize([sample_image, sample_mask, pred_mask])
    
show_predictions(sample_image, sample_mask)

In [None]:
# --- 1. DEFINITION OF PREDICTION CALLBACK ---
class DisplayCallback(Callback):
    """Callback to display predictions during training."""
    def on_epoch_end(self, epoch, logs=None):
        # Only run every 5 epochs
        if (epoch + 1) % 5 == 0:
            try:
                # Assuming show_predictions, sample_image, and sample_mask are defined
                show_predictions(sample_image, sample_mask) 
            except NameError:
                print("\nWARNING: show_predictions/sample_data variables not found for DisplayCallback.")


# --- 2. UNIFIED CALLBACKS ---
UNIFIED_EPOCHS = 40 # High number, EarlyStopping will stop training early

callbacks_single_stage = [
    DisplayCallback(),
    # Reduce LR if val_loss plateaus
    ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-6, verbose=1),
    # Early Stopping based on max val_dice_coef
    EarlyStopping(monitor="val_dice_coef", patience=5, mode="max", restore_best_weights=True, verbose=1),
    # Save best weights
    ModelCheckpoint(checkpoint_filepath, save_weights_only=True, monitor="val_dice_coef", mode="max", save_best_only=True, verbose=1)
]

# --- 3. UNIFIED TRAINING ---

print(f"\nStarting unified training for {UNIFIED_EPOCHS} epochs with initial LR={UNIFIED_LEARNING_RATE}.")

history_unified = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=UNIFIED_EPOCHS,
    steps_per_epoch=TRAIN_LENGTH // BATCH_SIZE, 
    callbacks=callbacks_single_stage
)

print("\nUnified training stage complete.")

# VI. Inferance
---

In [None]:
for i in range(5):
    for image, mask in valid.take(i):
        sample_image, sample_mask = image, mask
        show_predictions(sample_image, sample_mask)

# VI. Save and Load the Model 
---

In [None]:
model.save("unet_efficientnetb1.h5")
print("Saved model to disk")

In [None]:
def visualize_predicted(display_list):
    plt.figure(figsize=(15, 15))
    title = ['Input Image', 'Predicted Mask']
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

def show_(sample_image):
    pred_mask = model.predict(sample_image[tf.newaxis, ...])
    pred_mask = pred_mask.reshape(img_size[0],img_size[1],1)
    visualize_predicted([sample_image, pred_mask])
    return pred_mask

In [None]:
# --- CONFIGURATION ---
image_folder = '/kaggle/input/samples/samples/'
TARGET_SIZE = (512, 512)

# --- ITERATION LOOP ---

for filename in os.listdir(image_folder):
    if filename.endswith(".png"):
        file_path = os.path.join(image_folder, filename)

        print(f"Processing: {filename}")

        try:
            # Load, resize, convert to RGB array, and normalize
            sample_image = Image.open(file_path).resize(TARGET_SIZE)
            np_sample_image = np.array(sample_image.convert("RGB"), dtype=np.float32) / 255.0

            # Predict and visualize (calls Cell 2 functions)
            pred_mask = show_(np_sample_image) 

        except Exception as e:
            print(f"Error on {filename}: {e}")