# üåû Solar Panel Detection Using XYZ Satellite Tiles

This notebook automatically detects solar panels on building rooftops using freely available satellite imagery.

## Overview

**What it does:**
- Downloads high-resolution satellite imagery for each building (no API key needed)
- Runs AI model to detect solar panels on rooftops
- Outputs georeferenced point locations of detected solar panels

**Key Features:**
- ‚úÖ Free satellite imagery from Google/ESRI tile services
- ‚úÖ Parallel processing for speed
- ‚úÖ Saves all imagery for future reruns
- ‚úÖ Works with any building footprint dataset

**Typical Runtime:** 30-60 minutes for 100-200 buildings

## üì¶ 1. Install Required Packages

Run this cell if you haven't installed the required packages yet.

In [None]:
# Uncomment to install (run once)
# !pip install geoai-py geopandas rasterio earthengine-api pillow requests pandas

## üîß 2. Import Libraries

In [None]:
import geopandas as gpd
import geoai
import os
import math
from io import BytesIO
import requests
from PIL import Image
import numpy as np
import rasterio
import time
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings
warnings.filterwarnings('ignore')

print("‚úì All libraries imported successfully")

## ‚öôÔ∏è 3. Configuration

**Adjust these parameters for your use case:**

| Parameter | Description | Recommended Range |
|-----------|-------------|-------------------|
| `BUILDINGS_PATH` | Path to your building footprints GeoJSON | - |
| `OUTPUT_DIR` | Where to save results and imagery | - |
| `CONFIDENCE_THRESHOLD` | Minimum confidence for detection | 0.4-0.5 |
| `MIN_COVERAGE_PCT` | Minimum % of image that must be solar | 0.02-0.05 |
| `MAX_COVERAGE_PCT` | Maximum % to avoid detecting whole buildings | 0.3-0.4 |
| `MIN_OBJECT_AREA` | Minimum size in square metres | 10-20 |
| `OVERLAP` | Overlap between detection windows | 0.02 (2%) |
| `MAX_WORKERS` | Number of parallel threads | 4-8 |

In [None]:
# ============================================================================
# CONFIGURATION - EDIT THESE VALUES
# ============================================================================

# Input/Output paths
BUILDINGS_PATH = "path/to/your/buildings.geojson"  # üëà CHANGE THIS
OUTPUT_DIR = "output/solar_detection"

# Detection parameters
CONFIDENCE_THRESHOLD = 0.45
MIN_COVERAGE_PCT = 0.02
MAX_COVERAGE_PCT = 0.35
MIN_OBJECT_AREA = 10
OVERLAP = 0.02
MAX_WORKERS = 4

os.makedirs(OUTPUT_DIR, exist_ok=True)

print("="*70)
print("CONFIGURATION")
print("="*70)
print(f"Input: {BUILDINGS_PATH}")
print(f"Output: {OUTPUT_DIR}")
print(f"Confidence threshold: {CONFIDENCE_THRESHOLD}")
print(f"Coverage range: {MIN_COVERAGE_PCT*100}%-{MAX_COVERAGE_PCT*100}%")
print(f"Parallel workers: {MAX_WORKERS}")
print("="*70)

## üõ†Ô∏è 4. Helper Functions

These functions handle:
- Converting lat/lon to tile coordinates
- Downloading satellite imagery from XYZ tile services
- Stitching tiles together
- Centering imagery on building locations

### 4.1 Coordinate Conversion Functions

In [None]:
def lat_lon_to_tile(lat, lon, zoom):
    """Convert latitude/longitude to tile coordinates"""
    lat_rad = math.radians(lat)
    n = 2.0 ** zoom
    xtile = int((lon + 180.0) / 360.0 * n)
    ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
    return xtile, ytile

def tile_to_lat_lon(x, y, zoom):
    """Convert tile coordinates back to latitude/longitude"""
    n = 2.0 ** zoom
    lon_deg = x / n * 360.0 - 180.0
    lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
    lat_deg = math.degrees(lat_rad)
    return lat_deg, lon_deg

print("‚úì Coordinate conversion functions loaded")

### 4.2 Imagery Download Function

In [None]:
def download_centered_imagery(lat, lon, zoom=20, size_tiles=3):
    """Download satellite imagery centred on a specific location"""
    center_x, center_y = lat_lon_to_tile(lat, lon, zoom)
    tiles = []
    offset = size_tiles // 2
    
    for dy in range(-offset, offset + 1):
        row = []
        for dx in range(-offset, offset + 1):
            x = center_x + dx
            y = center_y + dy
            tile_img = None
            
            # Try Google Satellite
            try:
                url = f"https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={zoom}"
                response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=5)
                if response.status_code == 200 and len(response.content) > 1000:
                    tile_img = Image.open(BytesIO(response.content))
            except:
                pass
            
            # Fallback to ESRI
            if tile_img is None:
                try:
                    url = f"https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{zoom}/{y}/{x}"
                    response = requests.get(url, timeout=5)
                    if response.status_code == 200 and len(response.content) > 1000:
                        tile_img = Image.open(BytesIO(response.content))
                except:
                    pass
            
            if tile_img is None:
                tile_img = Image.new('RGB', (256, 256), (0, 0, 0))
            
            row.append(tile_img)
        tiles.append(row)
    
    # Stitch tiles
    tile_size = 256
    full_width = tile_size * size_tiles
    full_height = tile_size * size_tiles
    stitched = Image.new('RGB', (full_width, full_height))
    
    for i, row in enumerate(tiles):
        for j, tile in enumerate(row):
            stitched.paste(tile, (j * tile_size, i * tile_size))
    
    # Calculate pixel position and crop
    top_left_lat, top_left_lon = tile_to_lat_lon(center_x - offset, center_y - offset, zoom)
    bottom_right_lat, bottom_right_lon = tile_to_lat_lon(center_x + offset + 1, center_y + offset + 1, zoom)
    
    lon_range = bottom_right_lon - top_left_lon
    lat_range = top_left_lat - bottom_right_lat
    
    pixel_x = int((lon - top_left_lon) / lon_range * full_width)
    pixel_y = int((top_left_lat - lat) / lat_range * full_height)
    
    crop_size = 512
    half_crop = crop_size // 2
    
    left = max(0, pixel_x - half_crop)
    top = max(0, pixel_y - half_crop)
    right = min(full_width, pixel_x + half_crop)
    bottom = min(full_height, pixel_y + half_crop)
    
    if right - left < crop_size:
        if left == 0:
            right = min(full_width, crop_size)
        else:
            left = max(0, right - crop_size)
    
    if bottom - top < crop_size:
        if top == 0:
            bottom = min(full_height, crop_size)
        else:
            top = max(0, bottom - crop_size)
    
    cropped = stitched.crop((left, top, right, bottom))
    
    if cropped.size != (crop_size, crop_size):
        cropped = cropped.resize((crop_size, crop_size), Image.Resampling.LANCZOS)
    
    return cropped

print("‚úì Imagery download function loaded")

## üìç 5. Load Building Data

In [None]:
print("Loading building footprints...")

buildings_gdf = gpd.read_file(BUILDINGS_PATH)

if buildings_gdf.crs != "EPSG:4326":
    print(f"  Reprojecting from {buildings_gdf.crs} to EPSG:4326...")
    buildings_gdf = buildings_gdf.to_crs("EPSG:4326")

print("\n" + "="*70)
print("BUILDING DATA LOADED")
print("="*70)
print(f"Total buildings: {len(buildings_gdf)}")
print(f"CRS: {buildings_gdf.crs}")
print(f"Bounds: {buildings_gdf.total_bounds}")
print("="*70)

In [None]:
buildings_gdf.head()

## üõ∞Ô∏è 6. Download Satellite Imagery

This step downloads satellite imagery for each building.
**Note:** This can take 20-40 minutes depending on the number of buildings.

In [None]:
imagery_dir = os.path.join(OUTPUT_DIR, "imagery")
os.makedirs(imagery_dir, exist_ok=True)

print("="*70)
print("DOWNLOADING SATELLITE IMAGERY")
print("="*70)
print(f"Target: {len(buildings_gdf)} buildings")
print(f"Zoom level: 20 (~0.6m resolution)")
print(f"Output size: 512√ó512 pixels per building")
print(f"Saving to: {imagery_dir}")
print("\nThis may take 20-40 minutes...")
print("="*70)
print()

downloaded = 0
skipped = 0
failed = 0
start_time = time.time()

for idx, building in buildings_gdf.iterrows():
    imagery_path = os.path.join(imagery_dir, f"building_{idx}.png")
    
    if os.path.exists(imagery_path):
        skipped += 1
        continue
    
    try:
        centroid = building.geometry.centroid
        img = download_centered_imagery(centroid.y, centroid.x, zoom=20, size_tiles=3)
        
        img_array = np.array(img)
        mean_brightness = img_array.mean()
        
        if mean_brightness < 10:
            print(f"  Building {idx}: Imagery too dark, skipping")
            failed += 1
            continue
        
        img.save(imagery_path)
        downloaded += 1
        
        if (downloaded + skipped) % 10 == 0:
            elapsed = time.time() - start_time
            rate = (downloaded + skipped) / elapsed
            remaining = (len(buildings_gdf) - downloaded - skipped) / rate if rate > 0 else 0
            
            print(f"  Progress: {downloaded + skipped}/{len(buildings_gdf)} "
                  f"(Downloaded: {downloaded}, Skipped: {skipped}) "
                  f"[~{remaining/60:.0f} min remaining]")
        
        time.sleep(0.1)
        
    except Exception as e:
        print(f"  Building {idx}: Error - {e}")
        failed += 1

total_time = time.time() - start_time

print("\n" + "="*70)
print("IMAGERY DOWNLOAD COMPLETE")
print("="*70)
print(f"Downloaded: {downloaded} new images")
print(f"Skipped: {skipped} (already existed)")
print(f"Failed: {failed}")
print(f"Total time: {total_time/60:.1f} minutes")
print("="*70)

## ü§ñ 7. Run Solar Panel Detection

In [None]:
def process_building(args):
    """Process a single building for solar panel detection"""
    idx, building, detector, imagery_dir, output_dir = args
    
    result = {'building_id': idx, 'success': False, 'has_solar': False}
    
    try:
        imagery_path = os.path.join(imagery_dir, f"building_{idx}.png")
        
        if not os.path.exists(imagery_path):
            result['error'] = 'no_imagery'
            return result
        
        centroid = building.geometry.centroid
        mask_path = os.path.join(output_dir, f"building_{idx}_mask.tif")
        
        detector.generate_masks(
            imagery_path,
            output_path=mask_path,
            confidence_threshold=CONFIDENCE_THRESHOLD,
            mask_threshold=CONFIDENCE_THRESHOLD,
            min_object_area=MIN_OBJECT_AREA,
            overlap=OVERLAP,
            chip_size=(512, 512),
            batch_size=4,
            verbose=False
        )
        
        with rasterio.open(mask_path) as src:
            mask_data = src.read(1)
            max_confidence = np.max(mask_data)
            
            if max_confidence > CONFIDENCE_THRESHOLD:
                solar_pixels = np.sum(mask_data > CONFIDENCE_THRESHOLD)
                coverage = solar_pixels / mask_data.size
                
                if MIN_COVERAGE_PCT < coverage < MAX_COVERAGE_PCT:
                    mean_confidence = np.mean(mask_data[mask_data > CONFIDENCE_THRESHOLD])
                    result['success'] = True
                    result['has_solar'] = True
                    result['confidence'] = float(max_confidence)
                    result['mean_confidence'] = float(mean_confidence)
                    result['coverage_pct'] = float(coverage * 100)
                    result['geometry'] = centroid
                    return result
        
        if os.path.exists(mask_path):
            os.remove(mask_path)
        
        result['success'] = True
        
    except Exception as e:
        result['error'] = str(e)
    
    return result

print("‚úì Processing function defined")

In [None]:
processing_dir = os.path.join(OUTPUT_DIR, "processing")
os.makedirs(processing_dir, exist_ok=True)

print("="*70)
print("RUNNING SOLAR PANEL DETECTION")
print("="*70)
print("Initializing AI model...")

detector = geoai.SolarPanelDetector()

print(f"‚úì Model loaded")
print(f"\nProcessing {len(buildings_gdf)} buildings...")
print(f"Using {MAX_WORKERS} parallel workers")
print("="*70)
print()

args_list = [(idx, row, detector, imagery_dir, processing_dir) 
             for idx, row in buildings_gdf.iterrows()]

results = []
solar_count = 0
start_time = time.time()

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {executor.submit(process_building, args): args[0] for args in args_list}
    
    for i, future in enumerate(as_completed(futures), 1):
        result = future.result()
        results.append(result)
        
        if result['has_solar']:
            solar_count += 1
        
        if i % 20 == 0:
            elapsed = time.time() - start_time
            rate = i / elapsed
            remaining = (len(buildings_gdf) - i) / rate if rate > 0 else 0
            
            print(f"  Progress: {i}/{len(buildings_gdf)} ({i/len(buildings_gdf)*100:.1f}%) - "
                  f"Solar: {solar_count} - [~{remaining/60:.0f} min remaining]")

total_time = time.time() - start_time

print("\n" + "="*70)
print("DETECTION COMPLETE")
print("="*70)
print(f"Total buildings: {len(buildings_gdf)}")
print(f"Solar panels detected: {solar_count}")
print(f"Detection rate: {solar_count/len(buildings_gdf)*100:.1f}%")
print(f"Processing time: {total_time/60:.1f} minutes")
print("="*70)

## üíæ 8. Save Results

In [None]:
solar_detections = [r for r in results if r['has_solar']]

if len(solar_detections) > 0:
    solar_gdf = gpd.GeoDataFrame(solar_detections, crs="EPSG:4326")
    solar_gdf['detection_date'] = datetime.now().strftime('%Y-%m-%d')
    solar_gdf['confidence_threshold'] = CONFIDENCE_THRESHOLD
    
    output_geojson = os.path.join(OUTPUT_DIR, "solar_panels_detected.geojson")
    solar_gdf.to_file(output_geojson, driver='GeoJSON')
    
    output_csv = os.path.join(OUTPUT_DIR, "solar_panels_detected.csv")
    solar_gdf['lon'] = solar_gdf.geometry.x
    solar_gdf['lat'] = solar_gdf.geometry.y
    csv_columns = ['building_id', 'confidence', 'mean_confidence', 'coverage_pct', 
                   'lat', 'lon', 'detection_date']
    solar_gdf[csv_columns].to_csv(output_csv, index=False)
    
    print("\n" + "="*70)
    print("RESULTS SAVED")
    print("="*70)
    print(f"‚úì GeoJSON: {output_geojson}")
    print(f"‚úì CSV: {output_csv}")
    print(f"‚úì Total detections: {len(solar_gdf)}")
    print("="*70)
else:
    print("\n‚ö† No solar panels detected")

## üìä 9. Summary Statistics

In [None]:
if len(solar_detections) > 0:
    print("\n" + "="*70)
    print("SUMMARY STATISTICS")
    print("="*70)
    print(f"\nMean confidence: {solar_gdf['confidence'].mean():.3f}")
    print(f"Median confidence: {solar_gdf['confidence'].median():.3f}")
    print(f"Mean coverage: {solar_gdf['coverage_pct'].mean():.1f}%")
    print("="*70)
    
    solar_gdf[['building_id', 'confidence', 'coverage_pct']].head(10)

## üó∫Ô∏è 10. Visualisation

In [None]:
if len(solar_detections) > 0:
    import matplotlib.pyplot as plt
    
    fig, ax = plt.subplots(figsize=(15, 10))
    
    buildings_gdf.plot(ax=ax, color='lightgray', edgecolor='black', 
                       alpha=0.5, linewidth=0.5, label='All buildings')
    
    solar_gdf.plot(ax=ax, column='confidence', cmap='YlOrRd',
                   markersize=100, alpha=0.8, legend=True,
                   legend_kwds={'label': 'Detection Confidence'})
    
    ax.set_title(f'Solar Panel Detection Results\n'
                 f'{len(solar_gdf)} detections out of {len(buildings_gdf)} buildings',
                 fontsize=16, fontweight='bold')
    ax.set_xlabel('Longitude')
    ax.set_ylabel('Latitude')
    ax.grid(True, alpha=0.3)
    
    map_path = os.path.join(OUTPUT_DIR, 'detection_map.png')
    plt.tight_layout()
    plt.savefig(map_path, dpi=300, bbox_inches='tight')
    plt.show()
    
    print(f"‚úì Map saved to: {map_path}")

## ‚úÖ 11. Completion Summary

In [None]:
print("\n" + "="*70)
print("üéâ PROCESSING COMPLETE!")
print("="*70)
print(f"\nResults:")
print(f"  üìÅ Output directory: {OUTPUT_DIR}")
print(f"  üìç Detections: {len(solar_detections)} / {len(buildings_gdf)} buildings")
print(f"  üìä Detection rate: {len(solar_detections)/len(buildings_gdf)*100:.1f}%")
print("\nNext Steps:")
print("  1. Review detections in GIS software")
print("  2. Use rerun notebook to test different parameters")
print("  3. Validate results against ground truth")
print("="*70)