In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import os
import seaborn as sns
import datetime as datetime

# Solar Output Prediction using PyTorch Artificial Neural Network

This notebook implements a simple feedforward neural network using PyTorch to predict PV(W) - Solar Output.
Based on the approach used in `big_project_KAN.ipynb`.

In [None]:
# Determine the current path of the notebook
notebook_path = os.path.abspath("big_project.ipynb")
notebook_dir = os.path.dirname(notebook_path).replace('\\', '/')
print("Current notebook directory:", notebook_dir)
HOME_DIR = f'{notebook_dir}'
DATA_DIR = f'{HOME_DIR}/data/'
MODEL_DIR = f'{HOME_DIR}/model/'
print("Data directory set to:", DATA_DIR)
RAW_DATA_DIR = f'{DATA_DIR}/raw_data/'
TRAIN_DATA_DIR = f'{DATA_DIR}/training_data/'
SQL_DB_PATH = f'{DATA_DIR}/db_sqlite/'
SQL_DB_FILE = f'{SQL_DB_PATH}/big_project_db.sqlite3'
BACKUP_FILE_TYPE = 'feather'  # Options: 'csv', 'feather', 'parquet'

# Meteostat setup
METEOSTAT_CACHE_DIR = f'{DATA_DIR}/meteostat_cache/'
SOLAR_SITE_POSITION = (53.6985, -6.2080)  # Bettystown, Ireland
LATITUDE, LONGITUDE = SOLAR_SITE_POSITION
WEATHER_START_DATE = datetime.datetime(2024, 1, 1)
WEATHER_END_DATE = datetime.datetime.now()
# Solar panel configuration 
# Determined this using gemini and google maps measurements
ROOF_PANE_I_ANGLE = 30  # degrees
ROOF_PANE_II_ANGLE = 30  # degrees
ROOF_PANE_I_AZIMUTH = 65  # degrees ( East-South-East)
ROOF_PANE_II_AZIMUTH = 245  # degrees ( West-South-West)
ROOF_PANE_I_COUNT = 7
ROOF_PANE_II_COUNT = 12
SOLAR_PANEL_POWER_RATING_W = 440  # Watts per panel
TOTAL_SOLAR_PANE_I_CAPACITY_W = ROOF_PANE_I_COUNT * SOLAR_PANEL_POWER_RATING_W
TOTAL_SOLAR_PANE_II_CAPACITY_W = ROOF_PANE_II_COUNT * SOLAR_PANEL_POWER_RATING_W
TOTAL_SOLAR_CAPACITY_W = TOTAL_SOLAR_PANE_I_CAPACITY_W + TOTAL_SOLAR_PANE_II_CAPACITY_W

In [None]:
hourly_nighlty_threshold = 50

In [None]:
df_merge_hourly = pd.read_feather(f"{TRAIN_DATA_DIR}/hourly_solar_full_data.feather")

# Remove all rows where Clear sky GHI is less than or equal to 50
df_merge_hourly = df_merge_hourly[df_merge_hourly['Clear sky GHI'] > hourly_nighlty_threshold]

print(f"Data loaded: {df_merge_hourly.shape[0]} rows, {df_merge_hourly.shape[1]} columns")

In [None]:
level1_features = [level for level in df_merge_hourly.columns.tolist() if level.startswith('level1_')]
level2_features = [level for level in df_merge_hourly.columns.tolist() if level.startswith('level2_')]

print(f"Level 1 features: {len(level1_features)}")
print(f"Level 2 features: {len(level2_features)}")

In [None]:
# Define target column
target_col = 'PV(W)'

# Define features - same as KAN notebook
feature_cols = []
feature_cols.append('Temperature(C)')
feature_cols.append('Humidity(%)')
feature_cols.append('Sunshine Duration')
feature_cols.append('Precipitation(mm)')
feature_cols.append('Dew Point(C)')
feature_cols.append('Wind Direction(deg)')
feature_cols.append('Wind Speed(m/s)')
feature_cols.append('Wind Gust(m/s)')
feature_cols.append('Pressure(hPa)')
feature_cols.append('Wind Cooling')
feature_cols.append('Power_ClearSky_Pane_I(W)')
feature_cols.append('Power_ClearSky_Pane_II(W)')
feature_cols.append('Total_Power_ClearSky_Output(W)')
feature_cols.append('Month_Sin')
feature_cols.append('DayOfYear_Sin')
feature_cols.append('HourOfDay_Sin')
feature_cols += level2_features

print(f"\nTarget: {target_col}")
print(f"Number of features: {len(feature_cols)}")
print(f"Features: {feature_cols}")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# Drop NaNs
model_df = df_merge_hourly.dropna(subset=feature_cols + [target_col])
X = model_df[feature_cols].values
y = model_df[target_col].values.reshape(-1, 1)

print(f"Dataset shape: X={X.shape}, y={y.shape}")

# Split Data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"Training set: X_train={X_train.shape}, y_train={y_train.shape}")
print(f"Test set: X_test={X_test.shape}, y_test={y_test.shape}")

# Normalize (Neural Networks work better with scaled data)
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
y_train_scaled = scaler_y.fit_transform(y_train)
y_test_scaled = scaler_y.transform(y_test)

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train_scaled).to(device)
y_train_tensor = torch.FloatTensor(y_train_scaled).to(device)
X_test_tensor = torch.FloatTensor(X_test_scaled).to(device)
y_test_tensor = torch.FloatTensor(y_test_scaled).to(device)

print("\nData prepared and converted to PyTorch tensors")

In [None]:
class SolarPredictionNN(nn.Module):
    def __init__(self, input_dim):
        super(SolarPredictionNN, self).__init__()
        
        # Define layers
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)
        
        # Activation functions
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# Initialize the model
input_dim = X_train.shape[1]
model = SolarPredictionNN(input_dim).to(device)

print(f"Model initialized with input dimension: {input_dim}")
print(f"\nModel architecture:")
print(model)

In [None]:
# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training parameters
num_epochs = 200
batch_size = 64

print(f"Training configuration:")
print(f"  Loss function: MSE")
print(f"  Optimizer: Adam (lr=0.001)")
print(f"  Epochs: {num_epochs}")
print(f"  Batch size: {batch_size}")

In [None]:
# Storage for loss history
train_losses = []
test_losses = []

print("Starting training...\n")

for epoch in range(num_epochs):
    model.train()
    
    # Mini-batch training
    permutation = torch.randperm(X_train_tensor.size()[0])
    epoch_loss = 0
    
    for i in range(0, X_train_tensor.size()[0], batch_size):
        indices = permutation[i:i+batch_size]
        batch_x, batch_y = X_train_tensor[indices], y_train_tensor[indices]
        
        # Forward pass
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    
    # Calculate average training loss
    avg_train_loss = epoch_loss / (X_train_tensor.size()[0] / batch_size)
    train_losses.append(avg_train_loss)
    
    # Evaluate on test set
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test_tensor)
        test_loss = criterion(test_outputs, y_test_tensor).item()
        test_losses.append(test_loss)
    
    # Print progress
    if (epoch + 1) % 20 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.6f}, Test Loss: {test_loss:.6f}")

print("\n✓ Training Complete!")

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Train Loss', linewidth=2)
plt.plot(test_losses, label='Test Loss', linewidth=2)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (MSE)', fontsize=12)
plt.title('Training and Test Loss over Epochs', fontsize=14)
plt.legend(fontsize=12)
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Get predictions on test set
model.eval()
with torch.no_grad():
    y_pred_scaled = model(X_test_tensor).cpu().numpy()

# Inverse transform to get actual values
y_pred = scaler_y.inverse_transform(y_pred_scaled)
y_test_actual = scaler_y.inverse_transform(y_test_scaled)

# Apply physics constraints (no negative power output)
y_pred = np.clip(y_pred, 0, None)
y_test_actual = np.clip(y_test_actual, 0, None)

print("Predictions generated and inverse-transformed")

In [None]:
# Calculate metrics
mae = mean_absolute_error(y_test_actual, y_pred)
mse = mean_squared_error(y_test_actual, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test_actual, y_pred)

# Normalized metrics
capacity = y_test_actual.max()
n_mae = (mae / capacity) * 100
n_rmse = (rmse / capacity) * 100

print("="*50)
print("Model Performance Metrics")
print("="*50)
print(f"MAE:    {mae:.2f} W")
print(f"RMSE:   {rmse:.2f} W")
print(f"R²:     {r2:.4f}")
print(f"N-MAE:  {n_mae:.2f}%")
print(f"N-RMSE: {n_rmse:.2f}%")
print("="*50)

In [None]:
# Plot predicted vs actual
plt.figure(figsize=(12, 5))

# Scatter plot
plt.subplot(1, 2, 1)
plt.scatter(y_test_actual, y_pred, alpha=0.5, s=10)
plt.plot([y_test_actual.min(), y_test_actual.max()], 
         [y_test_actual.min(), y_test_actual.max()], 
         'r--', linewidth=2, label='Perfect Prediction')
plt.xlabel('Actual PV Output (W)', fontsize=12)
plt.ylabel('Predicted PV Output (W)', fontsize=12)
plt.title('Predicted vs Actual Solar Output', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)

# Residual plot
plt.subplot(1, 2, 2)
residuals = y_test_actual - y_pred
plt.scatter(y_pred, residuals, alpha=0.5, s=10)
plt.axhline(y=0, color='r', linestyle='--', linewidth=2)
plt.xlabel('Predicted PV Output (W)', fontsize=12)
plt.ylabel('Residuals (W)', fontsize=12)
plt.title('Residual Plot', fontsize=14)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Plot a sample of predictions vs actual over time
n_samples = min(200, len(y_test_actual))

plt.figure(figsize=(14, 6))
plt.plot(range(n_samples), y_test_actual[:n_samples], 
         label='Actual', linewidth=2, alpha=0.7)
plt.plot(range(n_samples), y_pred[:n_samples], 
         label='Predicted', linewidth=2, alpha=0.7)
plt.xlabel('Sample Index', fontsize=12)
plt.ylabel('PV Output (W)', fontsize=12)
plt.title('Actual vs Predicted Solar Output (First 200 samples)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Save the model
model_path = f"{MODEL_DIR}/pytorch_ann_model.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scaler_X': scaler_X,
    'scaler_y': scaler_y,
    'feature_cols': feature_cols,
    'target_col': target_col,
    'train_losses': train_losses,
    'test_losses': test_losses,
    'metrics': {
        'mae': mae,
        'rmse': rmse,
        'r2': r2,
        'n_mae': n_mae,
        'n_rmse': n_rmse
    }
}, model_path)

print(f"Model saved to: {model_path}")

## Summary

This notebook demonstrates a simple PyTorch Artificial Neural Network (ANN) for predicting solar output (PV in Watts).

### Model Architecture:
- Input Layer: Variable size based on features
- Hidden Layer 1: 128 neurons with ReLU activation
- Hidden Layer 2: 64 neurons with ReLU activation
- Hidden Layer 3: 32 neurons with ReLU activation
- Output Layer: 1 neuron (regression output)
- Dropout: 0.2 for regularization

### Training Configuration:
- Optimizer: Adam with learning rate 0.001
- Loss Function: Mean Squared Error (MSE)
- Batch Size: 64
- Epochs: 200

### Key Features:
- Uses same features as the KAN model for fair comparison
- Includes weather features (temperature, humidity, wind, etc.)
- Incorporates temporal features (time of day, day of year, month)
- Uses clear sky power output as an important predictor

The model can be used for real-time solar output prediction and energy forecasting.