In [None]:
!pip install ultralytics opencv-python numpy pycocotools

In [None]:
!pip install segmentation-models-pytorch torch torchvision albumentations
!pip install geopandas rasterio scikit-learn

In [6]:
import json
import os
from pathlib import Path
import shutil
from sklearn.model_selection import train_test_split
import geopandas as gpd
import numpy as np
import rasterio
from rasterio import features
from shapely.geometry import Polygon
from PIL import Image

# --- CONFIGURATION ---
RAW_DATA_DIR = Path("data/raw")
PROCESSED_DATA_DIR = Path("data/processed_unet")
SOLAFUNE_JSON_PATH = RAW_DATA_DIR / "train_annotations.json" # Assumed name

# --- MAIN SCRIPT ---

def parse_solafune_json_for_polygons(json_path):
    """
    Parses the proprietary Solafune JSON to extract polygons.
    This version is adapted for the user-provided JSON structure where
    annotations are nested within each image object.
    
    Returns:
        dict: Maps image filenames to their size and a list of polygons.
              Example: {'image1.png': {'size': (W, H), 'polygons': [Polygon(...),...]}}
    """
    annotations = {}
    with open(json_path, 'r') as f:
        data = json.load(f)

    # Iterate through the list of image objects
    for image_data in data.get('images',):
        filename = image_data['file_name']
        width = image_data['width']
        height = image_data['height']
        
        # Initialize the entry for this image
        annotations[filename] = {
            'size': (width, height),
            'polygons': [] 
        }
        
        # Iterate through the annotations nested within this image
        for ann in image_data.get('annotations',):
            # The segmentation is a flat list [x1, y1, x2, y2,...]
            flat_points = ann['segmentation']
            
            # Convert flat list to a list of (x, y) coordinate pairs
            points = list(zip(flat_points[::2], flat_points[1::2]))
            
            # A valid polygon needs at least 3 points
            if len(points) >= 3:
                annotations[filename]['polygons'].append(Polygon(points))
                
    return annotations

def create_and_save_masks(all_annotations, raw_image_dir, output_dir):
    """
    Creates binary masks from polygons and saves them as PNG files.
    """
    print("Creating and saving segmentation masks...")
    images_out_dir = output_dir / "images"
    masks_out_dir = output_dir / "masks"
    images_out_dir.mkdir(parents=True, exist_ok=True)
    masks_out_dir.mkdir(parents=True, exist_ok=True)

    for filename, data in all_annotations.items():
        # Copy original image to new directory
        shutil.copy(raw_image_dir / filename, images_out_dir / filename)

        # Create mask
        width, height = data['size']
        polygons = data['polygons']
        
        if not polygons:
            mask_array = np.zeros((height, width), dtype=np.uint8)
        else:
            # Rasterize the polygons into a mask array
            mask_array = features.rasterize(
                shapes=polygons,
                out_shape=(height, width),
                transform=rasterio.Affine.identity(),
                fill=0,
                all_touched=True,
                dtype=np.uint8
            )
        
        # Save mask as a grayscale PNG image
        mask_image = Image.fromarray(mask_array * 255) # Scale to 0-255 for image format
        mask_image.save(masks_out_dir / filename)

def main():
    # --- 1. Parse Annotations ---
    print("Parsing annotation file...")
    # Assumes raw images are in 'data/raw/train_images'
    raw_image_dir = RAW_DATA_DIR / "train_images"
    all_annotations = parse_solafune_json_for_polygons(SOLAFUNE_JSON_PATH)
    if not all_annotations:
        print("Parsing failed. Check the `parse_solafune_json_for_polygons` function.")
        return

    # --- 2. Create and Save Masks ---
    create_and_save_masks(all_annotations, raw_image_dir, PROCESSED_DATA_DIR)

    # --- 3. Split into Train/Validation sets ---
    # (This part can be handled by the PyTorch Dataset/DataLoader instead of moving files)
    print("\nData preparation complete.")
    print(f"Images and masks saved to: {PROCESSED_DATA_DIR}")

if __name__ == "__main__":
    main()

Parsing annotation file...
Creating and saving segmentation masks...

Data preparation complete.
Images and masks saved to: data\processed_unet


In [7]:
import os
import numpy as np
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.model_selection import train_test_split
from pathlib import Path
import albumentations as A # <-- Import Albumentations
from albumentations.pytorch import ToTensorV2 # <-- Import the PyTorch tensor converter

# --- CONFIGURATION ---
DATA_DIR = Path("data/processed_unet")
ENCODER = 'resnet34'
ENCODER_WEIGHTS = 'imagenet'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
EPOCHS = 50
BATCH_SIZE = 8

# --- 1. DEFINE AUGMENTATION PIPELINES ---
# Define a strong set of augmentations for the training set
train_transform = A.Compose()

# For validation, we only need to normalize and convert to a tensor
val_transform = A.Compose()


# --- 2. UPDATE PYTORCH DATASET CLASS ---
class TreeCanopyDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = np.array(Image.open(self.image_paths[idx]).convert("RGB"))
        mask = np.array(Image.open(self.mask_paths[idx]).convert("L"), dtype=np.float32)
        
        # The mask should be binary (0.0 or 1.0)
        mask = mask / 255.0
        
        if self.transform:
            # Apply augmentations
            transformed = self.transform(image=image, mask=mask)
            image = transformed['image']
            mask = transformed['mask'].unsqueeze(0) # Add channel dimension for the mask

        return image, mask

# --- MAIN SCRIPT (MODIFIED PART) ---
def main():
    # --- 1. Prepare Data Paths and Splits ---
    all_image_paths = sorted(list((DATA_DIR / "images").glob("*.png")))
    all_mask_paths = sorted(list((DATA_DIR / "masks").glob("*.png")))

    train_imgs, val_imgs, train_msks, val_msks = train_test_split(
        all_image_paths, all_mask_paths, test_size=0.2, random_state=42
    )

    # Apply the respective transforms to the datasets
    train_dataset = TreeCanopyDataset(train_imgs, train_msks, transform=train_transform)
    val_dataset = TreeCanopyDataset(val_imgs, val_msks, transform=val_transform)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    #... (The rest of the training loop remains the same)...
    
    # --- 2. Create Model ---
    model = smp.Unet(
        encoder_name=ENCODER, 
        encoder_weights=ENCODER_WEIGHTS,
        in_channels=3,
        classes=1,
        activation='sigmoid',
    )
    model.to(DEVICE)

    # --- 3. Define Loss and Optimizer ---
    loss = smp.losses.DiceLoss(mode='binary')
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    # --- 4. Training Loop ---
    print(f"Starting training on {DEVICE}...")
    best_iou = 0

    for epoch in range(EPOCHS):
        model.train()
        epoch_loss = 0
        for images, masks in train_loader:
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = model(images)
            l = loss(outputs, masks)
            l.backward()
            optimizer.step()
            
            epoch_loss += l.item()

        # Validation
        model.eval()
        val_iou = 0
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(DEVICE), masks.to(DEVICE)
                outputs = model(images)
                tp, fp, fn, tn = smp.metrics.get_stats(outputs, masks.long(), mode='binary')
                iou = smp.metrics.iou_score(tp, fp, fn, tn, reduction='micro')
                val_iou += iou.item()

        avg_train_loss = epoch_loss / len(train_loader)
        avg_val_iou = val_iou / len(val_loader)

        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_train_loss:.4f}, Val IoU: {avg_val_iou:.4f}")

        if avg_val_iou > best_iou:
            best_iou = avg_val_iou
            torch.save(model.state_dict(), 'best_unet_model.pth')
            print("   -> Best model saved!")

if __name__ == "__main__":
    main()

  from .autonotebook import tqdm as notebook_tqdm


TypeError: Compose.__init__() missing 1 required positional argument: 'transforms'

# Catgpt

In [11]:
import json
import shutil
from pathlib import Path
import numpy as np
from PIL import Image
from rasterio import features
import rasterio
from shapely.geometry import Polygon

# --- CONFIGURATION ---
RAW_DATA_DIR = Path("data/raw")
PROCESSED_DATA_DIR = Path("data/processed_unet")
SOLAFUNE_JSON_PATH = RAW_DATA_DIR / "train_annotations.json"

# --- MAIN SCRIPT ---

def parse_solafune_json_for_polygons(json_path):
    """
    Parses the proprietary Solafune JSON to extract polygons.
    Returns a dict: {'image1.png': {'size': (W,H), 'polygons': [Polygon(...), ...]}}
    """
    annotations = {}
    with open(json_path, 'r') as f:
        data = json.load(f)

    for image_data in data.get('images', []):
        filename = image_data['file_name']
        width = image_data['width']
        height = image_data['height']
        
        annotations[filename] = {'size': (width, height), 'polygons': []}
        
        for ann in image_data.get('annotations', []):
            flat_points = ann['segmentation']
            points = list(zip(flat_points[::2], flat_points[1::2]))
            if len(points) >= 3:
                annotations[filename]['polygons'].append(Polygon(points))
                
    return annotations

def create_and_save_masks(all_annotations, raw_image_dir, output_dir):
    """
    Creates binary masks from polygons and saves them as PNGs.
    """
    print("Creating and saving segmentation masks...")
    images_out_dir = output_dir / "images"
    masks_out_dir = output_dir / "masks"
    images_out_dir.mkdir(parents=True, exist_ok=True)
    masks_out_dir.mkdir(parents=True, exist_ok=True)

    for filename, data in all_annotations.items():
        shutil.copy(raw_image_dir / filename, images_out_dir / filename)
        width, height = data['size']
        polygons = data['polygons']
        
        if not polygons:
            mask_array = np.zeros((height, width), dtype=np.uint8)
        else:
            mask_array = features.rasterize(
                shapes=polygons,
                out_shape=(height, width),
                transform=rasterio.Affine.identity(),
                fill=0,
                all_touched=True,
                dtype=np.uint8
            )
        mask_image = Image.fromarray(mask_array * 255)
        mask_image.save(masks_out_dir / filename)

def main():
    raw_image_dir = RAW_DATA_DIR / "train_images"
    all_annotations = parse_solafune_json_for_polygons(SOLAFUNE_JSON_PATH)
    if not all_annotations:
        print("Parsing failed. Check JSON file.")
        return

    create_and_save_masks(all_annotations, raw_image_dir, PROCESSED_DATA_DIR)

    print("\nData preparation complete.")
    print(f"Images and masks saved to: {PROCESSED_DATA_DIR}")

if __name__ == "__main__":
    main()


Creating and saving segmentation masks...

Data preparation complete.
Images and masks saved to: data\processed_unet


In [None]:
import os
import numpy as np
import torch
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from sklearn.model_selection import train_test_split
from pathlib import Path
import albumentations as A
from albumentations.pytorch import ToTensorV2

# --- CONFIGURATION ---
DATA_DIR = Path("data/processed_unet")
ENCODER = 'resnet34'
ENCODER_WEIGHTS = 'imagenet'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
EPOCHS = 50
BATCH_SIZE = 8

# --- 1. AUGMENTATION PIPELINES ---
train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.Transpose(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, border_mode=0, p=0.5),
    A.RandomBrightnessContrast(p=0.3),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

val_transform = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

# --- 2. PYTORCH DATASET ---
class TreeCanopyDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = np.array(Image.open(self.image_paths[idx]).convert("RGB"))
        mask = np.array(Image.open(self.mask_paths[idx]).convert("L"), dtype=np.float32) / 255.0
        
        if self.transform:
            transformed = self.transform(image=image, mask=mask)
            image = transformed['image']
            mask = transformed['mask'].unsqueeze(0)  # Add channel dimension

        return image, mask

# --- 3. TRAINING LOOP ---
def main():
    all_image_paths = sorted(list((DATA_DIR / "images").glob("*.tif")))
    all_mask_paths = sorted(list((DATA_DIR / "masks").glob("*.tif")))

    print("Number of images found:", len(all_image_paths))
    print("Number of masks found:", len(all_mask_paths))

    train_imgs, val_imgs, train_msks, val_msks = train_test_split(
        all_image_paths, all_mask_paths, test_size=0.2, random_state=42
    )

    train_dataset = TreeCanopyDataset(train_imgs, train_msks, transform=train_transform)
    val_dataset = TreeCanopyDataset(val_imgs, val_msks, transform=val_transform)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Model, Loss, Optimizer
    model = smp.Unet(encoder_name=ENCODER, encoder_weights=ENCODER_WEIGHTS,
                     in_channels=3, classes=1, activation='sigmoid')
    model.to(DEVICE)

    loss_fn = smp.losses.DiceLoss(mode='binary')
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    print(f"Starting training on {DEVICE}...")
    best_iou = 0

    for epoch in range(EPOCHS):
        model.train()
        epoch_loss = 0
        for images, masks in train_loader:
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = loss_fn(outputs, masks)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        # Validation
        model.eval()
        val_iou = 0
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(DEVICE), masks.to(DEVICE)
                outputs = model(images)
                tp, fp, fn, tn = smp.metrics.get_stats(outputs, masks.long(), mode='binary')
                val_iou += smp.metrics.iou_score(tp, fp, fn, tn, reduction='micro').item()

        avg_train_loss = epoch_loss / len(train_loader)
        avg_val_iou = val_iou / len(val_loader)
        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_train_loss:.4f}, Val IoU: {avg_val_iou:.4f}")

        if avg_val_iou > best_iou:
            best_iou = avg_val_iou
            torch.save(model.state_dict(), 'best_unet_model.pth')
            print("   -> Best model saved!")

if __name__ == "__main__":
    main()


Number of images found: 150
Number of masks found: 150


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


Starting training on cpu...
