##### AI Solar Roof Potential Analyzer Web App

This Streamlit web application analyzes solar roof potential using satellite imagery. Key features include:

- Upload and crop satellite images of rooftops
- Generate heat maps of suitable areas for solar panels using a pre-trained segmentation model
- Calculate potential solar energy production based on roof area and location
- Visualize monthly energy production and cost savings
- Compare current electricity bills with potential solar savings

The app provides an interactive interface for users to assess the viability of solar panel installation on their roofs.

In [None]:
import streamlit as st
import torch
import cv2
import numpy as np
import albumentations as A
from PIL import Image
import segmentation_models_pytorch as smp
import torch.nn as nn
import time
import pandas as pd
import webbrowser
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objs as go

In [None]:
st.title('AI Solar Roof Potential Analyzer')

In [None]:
IMG_SIZE = 320
PIXEL_AREA = 0.2
PANEL_EFFICIENCY = 0.15
TEMP_COEF = -0.005
ELEC_RATE = 7

In [None]:
USGS_WEBSITE = 'https://earthexplorer.usgs.gov/'

In [None]:
ENCODER = 'timm-efficientnet-b4'
WEIGHTS = 'imagenet'

In [None]:
class SegmentationModel(nn.Module):
    def __init__(self):
        super(SegmentationModel, self).__init__()
        self.backbone = smp.Unet(
            encoder_name=ENCODER,
            encoder_weights=WEIGHTS,
            in_channels=3,
            classes=1,
            activation=None
        )

    def forward(self, images):
        return self.backbone(images)

In [None]:
@st.cache_resource
def load_model():
    model = SegmentationModel()
    model.load_state_dict(torch.load('vgg_best-model.pt', map_location=torch.device('cpu')))
    model.eval()
    return model

In [None]:
model = load_model()

In [None]:
def preprocess_image(image):
    aug = A.Compose([A.Resize(IMG_SIZE, IMG_SIZE)])
    augmented = aug(image=image)
    image = augmented['image']
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)
    image = torch.Tensor(image) / 255.0
    return image.unsqueeze(0), augmented['image'].shape[1:]

In [None]:
def postprocess_mask(mask, original_size):
    mask = mask.squeeze().cpu().numpy()
    mask = cv2.resize(mask, original_size, interpolation=cv2.INTER_NEAREST)
    return mask

In [None]:
def roof_area_calculate(mask):
    roof_area = PIXEL_AREA * np.sum(np.any(mask != [0, 0, 0], axis=-1))
    return roof_area

In [None]:
def apply_heat_map(mask):
    gradient = np.linspace(0, 1, mask.shape[1]) 
    gradient = np.tile(gradient, (mask.shape[0], 1))  

    orange = np.array([255, 125, 0], dtype=np.uint8)
    yellow = np.array([255, 255, 0], dtype=np.uint8)

    colored_mask = np.zeros((*mask.shape, 3), dtype=np.uint8)
    for i in range(3):
        colored_mask[:, :, i] = np.uint8((orange[i] * (1 - gradient) + yellow[i] * gradient) * mask)
    return colored_mask

In [None]:
def load_states():
    df = pd.read_csv('Datasets/solar_irradiance.csv')
    return [state.title() for state in df.iloc[:, 0].tolist()]

In [None]:
def load_temperature_data():
    return pd.read_csv('Datasets/temperature.csv')

In [None]:
def load_irradiance_data():
    return pd.read_csv('Datasets/solar_irradiance.csv')

In [None]:
temperature_df = load_temperature_data()
irradiance_df = load_irradiance_data()

In [None]:
def calculate_monthly_solar_energy(state, roof_area):
    state_temp = temperature_df[temperature_df.iloc[:, 0] == state.upper()].iloc[0, 1:].tolist()
    state_irrd = irradiance_df[irradiance_df.iloc[:, 0] == state.upper()].iloc[0, 1:].tolist()
    
    monthly_energy = []
    for temp, irrd in zip(state_temp, state_irrd):
        eff = PANEL_EFFICIENCY * (1 + TEMP_COEF * (temp - 25))
        solar_energy = irrd * eff * roof_area * 30  # kWh
        monthly_energy.append(solar_energy//100)
    
    return monthly_energy

In [None]:
def main():
    states = load_states()

    with st.sidebar:

        st.header("Share your Contact info: ")

        st.sidebar.image("satellite.jpg", use_column_width=True)

        user_name = st.text_input("Enter your name:")

        col1, col2 = st.columns([2,1])

        with col1:
            age = st.slider("Age: ", 1, 100, 50)
        with col2:
            gender = ["M", "F"]
            gender = st.selectbox("Gender", gender)

        user_info = st.text_input("Enter your mail id:")

        if st.button("Submit"):
            i = 0

    st.success('We are here to make Bharat a GREEN country')

    st.markdown("""---""")
   
    col1, col2 = st.columns([3,1])

    with col1:
      st.subheader("Get satellite imagery of your rooftop :satellite::")

    with col2:
      if st.button("USGS Earth Explorer"):
        webbrowser.open_new_tab(USGS_WEBSITE)

    col1, col2 = st.columns(2)

    with col1:
        selected_state = st.selectbox("Select a State", states)

    with col2:
        current_bill = st.number_input('Enter your Current Annual Electric bill (in Rs.)', min_value=0, max_value=10000000, value=0)

    uploaded_file = st.file_uploader("Upload aerial imagery", type=["jpg", "jpeg", "png", "tif"])

    if uploaded_file is not None:
        image = Image.open(uploaded_file)
        width, height = image.size

        col1, col2 = st.columns(2)

        with col2:
            st.subheader("Crop Parameters")
            left = st.slider("Left", 0, width, 0)
            top = st.slider("Top", 0, height, 0)
            right = st.slider("Right", left, width, width)
            bottom = st.slider("Bottom", top, height, height)

        with col1:
            st.subheader("Image Preview")
            fig, ax = plt.subplots(figsize=(10, 10))
            ax.imshow(np.array(image))
            rect = plt.Rectangle((left, top), right - left, bottom - top,
                                 fill=False, edgecolor='red', linewidth=2)
            ax.add_patch(rect)
            ax.axis('off')
            st.pyplot(fig)

        if st.button("Generate Segmentation Mask"):
            cropped_image = image.crop((left, top, right, bottom))
            image_np = np.array(cropped_image)
            preprocessed_image, original_size = preprocess_image(image_np)

            with st.spinner('Generating heatmap...'):
                time.sleep(2)  # Simulate some processing time
                with torch.no_grad():
                    logits = model(preprocessed_image)
                    pred_mask = torch.sigmoid(logits)
                    pred_mask = (pred_mask > 0.5).float()

                resized_mask = postprocess_mask(pred_mask, (right - left, bottom - top))
                heat_map_mask = apply_heat_map(resized_mask)

                roof_area = roof_area_calculate(heat_map_mask)

                col1, col2 = st.columns(2)
                with col1:
                    st.subheader("Rooftop")
                    st.image(cropped_image, use_column_width=True)
                with col2:
                    st.subheader("Generated Heatmap")
                    st.image(heat_map_mask, use_column_width=True, clamp=True)

                st.info(f"Roof Area: {roof_area:.2f} sq. m")

                monthly_energy = calculate_monthly_solar_energy(selected_state, roof_area)
                
                months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
                
                
                fig = go.Figure()
                fig.add_trace(go.Scatter(x=months, y=monthly_energy, mode='lines+markers', name='Energy Production'))
                fig.update_layout(
                    title=f'Monthly Solar Energy Production of your house',
                    xaxis_title='Months',
                    yaxis_title='Energy Production (kWh)',
                    height=350
                )

                st.plotly_chart(fig)

                total_annual_energy = sum(monthly_energy)

                if current_bill > 0:
                    energy_savings = total_annual_energy * ELEC_RATE

                    monthly_savings = [energy * ELEC_RATE for energy in monthly_energy]
                    monthly_bill = [(current_bill / 12) - (savings) for savings in monthly_savings]

                    fig = go.Figure()

                    fig.add_trace(go.Scatter(
                       x=months,
                       y=[current_bill / 12] * len(months),
                       mode='lines',
                       name='Current Monthly Bill',
                       line=dict(color='red', width=2, dash='dash') 
                    ))

                    fig.add_trace(go.Scatter(
                        x=months,
                        y=monthly_bill,
                        mode='lines+markers',
                        name='Solar Monthly Bill',
                        marker=dict(color='green', size=10),  
                        line=dict(color='green', width=2) 
                    ))

                    fig.update_layout(
                        title=f'Monthly Bill After Solar Savings of you house',
                        xaxis_title='Months',
                        yaxis_title='Monthly Bill (Rs.)',
                        height=300
                    )

                    st.plotly_chart(fig)

                col1, col2 = st.columns(2)

                with col1:            
                  x = ['Current Bill', 'Solar Bill']
                  y = [current_bill, energy_savings]

                  fig = go.Figure()

                  fig.add_trace(go.Bar(
                    x=x,
                    y=y,
                    name='Data',
                    marker=dict(color='orange') 
                  ))

                  fig.update_layout(
                    title='Annual Electricity Bill',
                    xaxis_title='',
                    yaxis_title='Electric Bill',
                    height=500
                  )

                  st.plotly_chart(fig)

                with col2:
                  st.info(
                          f"""
                          Your State: {selected_state}

                          Your Roof Area: {roof_area:.2f} sq. m

                          Total Solar Panel area: {(roof_area*0.8):.2f} sq. m

                          Total Annual Solar Energy Production: {total_annual_energy:.2f} kWh
                          """)
                  
                  st.success(f"Estimated Annual Energy Savings: Rs. {(current_bill - energy_savings):.2f}")

In [None]:
if __name__ == "__main__":
    main()

---
##### Rooftop Segmentation Model Training

This script trains a deep learning model for rooftop segmentation in satellite imagery. Key components include:

- Data loading and cleaning from CSV file
- Image augmentation using albumentations library
- Custom dataset and dataloader creation
- U-Net model implementation using segmentation_models_pytorch
- Training loop with learning rate scheduling
- Model evaluation and visualization of results

The trained model can be used to identify suitable areas for solar panel installation in satellite images.

In [None]:
import os
import sys
import cv2
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
from tqdm import tqdm
import albumentations as A
import matplotlib.pyplot as plt
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from segmentation_models_pytorch.losses import DiceLoss, FocalLoss
from torch.optim.lr_scheduler import StepLR

In [None]:
sys.path.append('/home/robosobo/solar_rooftop/Datasets')

In [None]:
CSV_PATH = 'Datasets/solar_train.csv'
DATA_DIR = 'Datasets/'

In [None]:
EPOCHS = 50
LR = 0.001
BATCH_SIZE = 15 
IMG_SIZE = 320

In [None]:
ENCODER = 'timm-efficientnet-b3'
WEIGHTS = 'imagenet'

In [None]:
device = torch.device('cpu')

In [None]:
def clean_dataset(df):
    def fix_path(path):
        if path.startswith('images') and '_label' in path:
            return path.replace('images', 'masks', 1)
        return path

    df['images'] = df['images'].apply(fix_path)
    df['masks'] = df['masks'].apply(fix_path)
    
    df = df[df['images'].apply(lambda x: os.path.exists(os.path.join(DATA_DIR, x)))]
    df = df[df['masks'].apply(lambda x: os.path.exists(os.path.join(DATA_DIR, x)))]
    
    return df

In [None]:
print("Loading and cleaning dataset...")
df = pd.read_csv(CSV_PATH)
df = clean_dataset(df)
train_df, valid_df = train_test_split(df, test_size=0.2, random_state=42)
print("Dataset loaded and cleaned.")

In [None]:
def get_train_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.1, rotate_limit=45, p=0.5),
        A.OneOf([
            A.RandomBrightnessContrast(p=1),
            A.RandomGamma(p=1),
        ], p=0.5),
    ]) 

In [None]:
def get_valid_augs():
    return A.Compose([
        A.Resize(IMG_SIZE, IMG_SIZE)
    ])

In [None]:
class SegmentationDataset(Dataset):
    def __init__(self, df, augmentations):
        self.df = df
        self.augmentations = augmentations

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        try:
            row = self.df.iloc[idx]

            image_path = os.path.join(DATA_DIR, row.images)
            mask_path = os.path.join(DATA_DIR, row.masks)

            if not os.path.exists(image_path):
                raise FileNotFoundError(f"Image file not found: {image_path}")
            image = cv2.imread(image_path)
            if image is None:
                raise ValueError(f"Failed to load image: {image_path}")
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            if not os.path.exists(mask_path):
                raise FileNotFoundError(f"Mask file not found: {mask_path}")
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            if mask is None:
                raise ValueError(f"Failed to load mask: {mask_path}")

            mask = np.expand_dims(mask, axis=-1)

            if self.augmentations:
                augmented = self.augmentations(image=image, mask=mask)
                image = augmented['image']
                mask = augmented['mask']

            image = np.transpose(image, (2, 0, 1)).astype(np.float32)
            mask = np.transpose(mask, (2, 0, 1)).astype(np.float32)

            image = torch.Tensor(image) / 255.0
            mask = torch.round(torch.Tensor(mask) / 255.0)

            return image, mask
        except Exception as e:
            print(f"Error processing sample {idx}: {str(e)}")
            return None

In [None]:
def collate_fn(batch):
    batch = list(filter(lambda x: x is not None, batch))
    return torch.utils.data.dataloader.default_collate(batch)

In [None]:
print("Creating datasets...")
trainset = SegmentationDataset(train_df, augmentations=get_train_augs())
validset = SegmentationDataset(valid_df, augmentations=get_valid_augs())

In [None]:
print(f'Length of trainset: {len(trainset)}')
print(f'Length of validset: {len(validset)}')

In [None]:
print("Creating data loaders...")
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
validloader = DataLoader(validset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

In [None]:
print(f'Total no. of batches in trainloader: {len(trainloader)}')
print(f'Total no. of batches in validloader: {len(validloader)}')

In [None]:
class SegmentationModel(nn.Module):
    def __init__(self):
        super(SegmentationModel, self).__init__()
        self.backbone = smp.Unet(
            encoder_name=ENCODER,
            encoder_weights=WEIGHTS,
            in_channels=3,
            classes=1,
            activation=None
        )

    def forward(self, images, masks=None):
        logits = self.backbone(images)

        if masks is not None:
            dice_loss = DiceLoss(mode='binary')(logits, masks)
            bce_loss = nn.BCEWithLogitsLoss()(logits, masks)
            focal_loss = FocalLoss(mode='binary')(logits, masks)
            return logits, dice_loss + bce_loss + 0.5 * focal_loss

        return logits

In [None]:
print("Creating model...")
model = SegmentationModel().to(device)
print("Model created.")

In [None]:
def train_fn(dataloader, model, optimizer):
    model.train()
    total_loss = 0.0
    for images, masks in tqdm(dataloader, desc="Train"):
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        logits, loss = model(images, masks)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

In [None]:
def eval_fn(dataloader, model):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, masks in tqdm(dataloader, desc="Valid"):
            images, masks = images.to(device), masks.to(device)
            logits, loss = model(images, masks)
            total_loss += loss.item()
    return total_loss / len(dataloader)

In [None]:
print("Creating optimizer...")
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
print("Optimizer created.")

In [None]:
print("Creating learning rate scheduler...")
scheduler = StepLR(optimizer, 15, gamma=0.1)
print("Scheduler created.")

In [None]:
best_loss = float('inf')

In [None]:
print("Starting training loop...")
for i in range(EPOCHS):
    print(f"Epoch {i+1}/{EPOCHS}")
    train_loss = train_fn(trainloader, model, optimizer)
    valid_loss = eval_fn(validloader, model)

    if valid_loss < best_loss:
        torch.save(model.state_dict(), 'best-model.pt')
        print("SAVED MODEL")
        best_loss = valid_loss

    print(f'Epoch: {i+1}, Train Loss: {train_loss:.6f}, Valid Loss: {valid_loss:.6f}, LR: {scheduler.get_last_lr()[0]}')
    
    scheduler.step()

In [None]:
print("Training completed.")

In [None]:
# Visualization
print("Starting visualization...")
idx = 25

In [None]:
model.load_state_dict(torch.load('best-model.pt', map_location=device))
image, mask = validset[idx]
image, mask = image.to(device), mask.to(device)

In [None]:
logits_mask = model(image.unsqueeze(0))
pred_mask = torch.sigmoid(logits_mask)
pred_mask = (pred_mask > 0.5) * 1.0

In [None]:
image = image.cpu()
mask = mask.cpu()
pred_mask = pred_mask.cpu()

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.imshow(image.permute(1, 2, 0))
plt.title('Original Image')
plt.subplot(1, 3, 2)
plt.imshow(mask.squeeze(), cmap='gray')
plt.title('Ground Truth Mask')
plt.subplot(1, 3, 3)
plt.imshow(pred_mask.squeeze().detach().numpy(), cmap='gray')
plt.title('Predicted Mask')
plt.show()
print("Visualization completed.")