In [23]:
%pip install\
requests mercantile numpy==1.26.4 pandas pillow shapely opencv-python matplotlib torch torchvision torchaudio ultralytics mlflow

Collecting mlflow
  Downloading mlflow-3.6.0-py3-none-any.whl (8.9 MB)
     ---------------------------------------- 0.0/8.9 MB ? eta -:--:--
      --------------------------------------- 0.1/8.9 MB 3.3 MB/s eta 0:00:03
     -- ------------------------------------- 0.5/8.9 MB 6.8 MB/s eta 0:00:02
     ------ --------------------------------- 1.4/8.9 MB 11.0 MB/s eta 0:00:01
     --------- ------------------------------ 2.1/8.9 MB 12.3 MB/s eta 0:00:01
     ------------ --------------------------- 2.8/8.9 MB 13.0 MB/s eta 0:00:01
     ---------------- ----------------------- 3.6/8.9 MB 13.4 MB/s eta 0:00:01
     -------------------- ------------------- 4.5/8.9 MB 14.5 MB/s eta 0:00:01
     ----------------------- ---------------- 5.3/8.9 MB 14.7 MB/s eta 0:00:01
     --------------------------- ------------ 6.2/8.9 MB 15.1 MB/s eta 0:00:01
     ------------------------------ --------- 6.8/8.9 MB 14.5 MB/s eta 0:00:01
     ---------------------------------- ----- 7.6/8.9 MB 15.1 MB/s eta


[notice] A new release of pip is available: 23.0.1 -> 25.3
[notice] To update, run: C:\Users\Premithius\solar_jupyter\Scripts\python.exe -m pip install --upgrade pip


In [15]:
import os
import math
import json
import requests
from io import BytesIO
from pathlib import Path
import mercantile
import numpy as np
import pandas as pd
from PIL import Image
from shapely.geometry import Polygon
import cv2
import matplotlib.pyplot as plt

import torch
from ultralytics import YOLO

PIXEL_TO_SQM = 0.04
BUFFER_RADIUS_SQFT = 1200

In [16]:
def sqft_area_to_radius_m(area_sqft):
    """Convert circular area (sqft) → radius in meters"""
    area_m2 = area_sqft * SQFT_TO_SQM
    return math.sqrt(area_m2 / math.pi)


def meters_to_lat_lon(dx, dy, lat):
    dlat = dy / 111320.0
    dlon = dx / (111320.0 * math.cos(math.radians(lat)))
    return dlat, dlon

In [17]:
ESRI_TILE_URL = (
    "https://services.arcgisonline.com/ArcGIS/rest/services/"
    "World_Imagery/MapServer/tile/{z}/{y}/{x}"
)
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}

def fetch_imagery(lat, lon, out_path, zoom=19, grid_size=3):
    center_tile = mercantile.tile(lon, lat, zoom)
    half = grid_size // 2
    tile_images = {}
    tile_size = None

    for dx in range(-half, half + 1):
        for dy in range(-half, half + 1):
            x = center_tile.x + dx
            y = center_tile.y + dy
            url = ESRI_TILE_URL.format(z=zoom, x=x, y=y)
            r = requests.get(url, headers=HEADERS, timeout=10)
            if r.status_code != 200:
                return None
            img = Image.open(BytesIO(r.content)).convert("RGB")
            tile_images[(dx, dy)] = img
            tile_size = img.size[0]

    stitched = Image.new("RGB", (tile_size * grid_size, tile_size * grid_size))
    for dx in range(-half, half + 1):
        for dy in range(-half, half + 1):
            img = tile_images[(dx, dy)]
            col = dx + half
            row = dy + half
            stitched.paste(img, (col * tile_size, row * tile_size))

    stitched = stitched.resize((640, 640))
    stitched.save(out_path)
    return out_path
class SolarSegmentationModel:
    def __init__(self, model_path=None, device='cpu'):
        if model_path:
            self.model = YOLO(model_path)
        else:
            self.model = None
        self.device = device

    def predict_mask(self, image: Image.Image):
        if self.model is None:
            # fallback dummy mask
            arr = np.array(image.resize((256, 256))).mean(axis=2)
            return (arr < 128).astype(np.uint8)
        results = self.model.predict(np.array(image), conf=0.3, device=self.device)
        mask = np.zeros((image.height, image.width), dtype=np.uint8)
        for r in results:
            if hasattr(r, "masks") and r.masks is not None:
                mask_r = r.masks.data[0].cpu().numpy()  # shape: HxW
                mask = np.maximum(mask, mask_r.astype(np.uint8))
        return mask

In [18]:
class SolarSegmentationModel:
    def __init__(self, model_path: str):
        self.model = YOLO(model_path)

    def predict_mask(self, image_path: str):
        results = self.model(image_path, conf=0.25, iou=0.5)
        r = results[0]
        if r.masks is None:
            return None
        masks = r.masks.data.cpu().numpy()
        combined = masks.sum(axis=0)
        return (combined > 0).astype(np.uint8)

In [19]:
def mask_area_m2(mask, meters_per_pixel=0.3):
    if mask is None:
        return 0.0
    pixel_area = meters_per_pixel ** 2
    return float(mask.sum() * pixel_area)

In [20]:
def qc_status(image, mask):
    if image is None or mask is None:
        return "NOT_VERIFIABLE"
    if mask.sum() < 20:
        return "VERIFIABLE"
    return "VERIFIABLE"


In [21]:
def process_sample(row, model, output_dir):
    sid = str(row['sample_id'])
    lat, lon = row['latitude'], row['longitude']

    out_dir = output_dir / sid
    out_dir.mkdir(parents=True, exist_ok=True)

    img_path = out_dir / "image.png"
    fetch_imagery(lat, lon, img_path)

    image = Image.open(img_path)
    mask = model.predict_mask(str(img_path))

    area_1200 = mask_area_m2(mask)

    if area_1200 > 1.0:
        presence = True
        buffer_used = 1200
        area_m2 = area_1200
    else:
        presence = area_1200 > 0
        buffer_used = 2400
        area_m2 = area_1200

    qc = qc_status(image, mask)

    if mask is not None:
        Image.fromarray(mask * 255).save(out_dir / "mask.png")

    result = {
        "sample_id": sid,
        "latitude": lat,
        "longitude": lon,
        "pv_present": presence,
        "buffer_sqft": buffer_used,
        "pv_area_m2": round(area_m2, 2),
        "qc_status": qc
    }

    with open(out_dir / "result.json", "w") as f:
        json.dump(result, f, indent=2)

    return result

In [24]:
def run_yolo_excel_pipeline(
    excel_path,
    model_path,
    output_dir,
    imgsz=640,
    conf=0.3
):
    """
    Excel: sample_id | latitude | longitude
    Uses existing ESRI fetch_imagery() function
    """
    df = pd.read_excel(excel_path)
    model = YOLO(model_path)

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    yolo_out_dir = output_dir / "yolo_results"
    yolo_out_dir.mkdir(parents=True, exist_ok=True)

    summary = []

    for _, row in df.iterrows():
        sid = str(row["sample_id"])
        lat = float(row["latitude"])
        lon = float(row["longitude"])

        print(f"Processing {sid}")

        sample_dir = output_dir / sid
        sample_dir.mkdir(parents=True, exist_ok=True)
        result_json_path = sample_dir / "result.json"
        
        base_payload = {
            "sample_id": int(sid) if sid.isdigit() else sid,
            "lat": lat,
            "lon": lon,
            "has_solar": False,
            "confidence": 0.0,
            "pv_area_sqm_est": 0.0,
            "buffer_radius_sqft": BUFFER_RADIUS_SQFT,
            "qc_status": "PENDING",
            "bbox_or_mask": None,
            "image_metadata": {
                "source": "ESRI",
                "capture_date": "UNKNOWN"
            }
        }
        
        with open(result_json_path, "w") as f:
            json.dump(base_payload, f, indent=2)
        image_path = sample_dir / "image.png"
        fetch_imagery(lat, lon, image_path)

        img = Image.open(image_path).convert("RGB")
        img_np = np.array(img)
        mean_val = img_np.mean()
        std_val = img_np.std()
        
        print(f"Image stats — mean: {mean_val:.2f}, std: {std_val:.2f}")
        
        if std_val < 7:
            base_payload.update({
                "qc_status": "NOT_VERIFIABLE",
                "reason": "Imagery unavailable or low detail"
            })
        
            with open(result_json_path, "w") as f:
                json.dump(base_payload, f, indent=2)
        
            summary.append({
                "sample_id": sid,
                "latitude": lat,
                "longitude": lon,
                "pv_present": False,
                "num_panels": 0,
                "total_area_pixels": 0,
                "qc_status": "NOT_VERIFIABLE"
            })
            print("Imagery invalid skipping yolo")
            continue

        results = model.predict(
            source=image_path,
            imgsz=imgsz,
            conf=conf,
            save=True,
            project=sample_dir,
            name="yolo",
            exist_ok=True
        )
        result = results[0]
        detections = []
        total_area_px = 0

        if result.masks is not None:
            for i, mask in enumerate(result.masks.data):
                mask_np = (mask.cpu().numpy() > 0.5).astype(np.uint8)

                contours, _ = cv2.findContours(
                    mask_np,
                    cv2.RETR_EXTERNAL,
                    cv2.CHAIN_APPROX_SIMPLE
                )

                for cnt in contours:
                    if cv2.contourArea(cnt) < 100:
                        continue

                    poly = Polygon(cnt.squeeze())
                    area_px = int(poly.area)

                    detections.append({
                        "class": "solar_panel",
                        "confidence": round(float(result.boxes.conf[i]), 3),
                        "area_pixels": area_px,
                        "polygon": cnt.squeeze().tolist()
                    })

                    total_area_px += area_px
        mask_img = np.zeros(
            (img.height, img.width),
            dtype=np.uint8
        )

        for d in detections:
            pts = np.array(d["polygon"], dtype=np.int32)
            cv2.fillPoly(mask_img, [pts], 255)

        Image.fromarray(mask_img).save(sample_dir / "mask.png")
        has_solar = len(detections) > 0
        
        if has_solar:
            max_conf = max(d["confidence"] for d in detections)
            area_sqm = round(total_area_px * PIXEL_TO_SQM, 2)
        
            base_payload.update({
                "has_solar": True,
                "confidence": round(max_conf, 3),
                "pv_area_sqm_est": area_sqm,
                "qc_status": "VERIFIABLE",
                "bbox_or_mask": detections[0]["polygon"]
            })
        else:
            base_payload.update({
                "has_solar": False,
                "confidence": 0.0,
                "pv_area_sqm_est": 0.0,
                "qc_status": "VERIFIABLE",
                "bbox_or_mask": None
            })
        with open(result_json_path, "w") as f:
            json.dump(base_payload, f, indent=2)
    summary_df = pd.DataFrame(summary)
    summary_df.to_excel(output_dir / "summary.xlsx", index=False)
    print("All samples processed using ESRI imagery")
    return summary_df

In [None]:
summary_df = run_yolo_excel_pipeline(
    excel_path="samples.xlsx",
    model_path="yolov8s-seg-solar-panels/best.pt",
    output_dir="outputs"
)
summary_df.head()

In [None]:
import mlflow
import mlflow.pytorch
mlflow.start_run(run_name="solar_panel_training")
for epoch in range(num_epochs):
    mlflow.log_metric("loss", loss, step=epoch)
    mlflow.log_metric("f1_score", f1, step=epoch)
    mlflow.log_metric("rmse", rmse, step=epoch)
mlflow.pytorch.log_model(model, "model")
mlflow.end_run()