Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pandas, plotly, pulp, pendulum, sqlalchemy, psycopg2-binary, marshmallow, pyyaml

| Date | PR | Branch | Summary |
|------|-----|--------|---------|
| 2026-04-30 | TBD | feature/profile-filter-and-nameplate | Opt-in profile anomaly filter (`maxEnergyPerIntervalKwh`) + display-only `nameplateKwp` metadata field on Profile (v2.0.4) |
| 2026-01-12 | - | main | License changed from AGPL-3.0 to MIT |
| 2026-01-09 | #59 | bugfix/nan-threshold-aggregation | Fix NaN propagation in rate averages and cost totals (v2.0.1) |
| 2026-01-09 | #58 | feature/flux-schema-support | Flux/flows schema separation for DB queries (v2.0.0) |
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "skypro"
version = "2.0.3"
version = "2.0.4"
description = "Skyprospector by Cepro"
authors = ["damonrand <damon@cepro.energy>"]
license = "MIT"
Expand Down
14 changes: 14 additions & 0 deletions src/skypro/commands/simulator/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ class Profile:
profiled_size_kwp: Optional[float] = field_with_opts(key="profiledSizeKwp")
scaled_size_kwp: Optional[float] = field_with_opts(key="scaledSizeKwp")

# Display-only metadata: installed nameplate kWp. Independent of any
# scaling — engine never reads this for math. Useful when csvProfile
# already carries real kWh values (no scaling) but downstream tools
# still need to know the system size.
nameplate_kwp: Optional[float] = field_with_opts(key="nameplateKwp")

# Opt-in defensive filter for corrupt meter data. When set, csvProfile
# rows with |energy| above this threshold are NaN'd and linearly
# interpolated. Default (None) = no filter; raw values pass through so
# bad data surfaces loudly in sim output. Set per-profile in YAML when
# a known upstream pipeline emits anomalies (e.g. occasional metering
# glitches that produce values orders of magnitude beyond plausible).
max_energy_per_interval_kwh: Optional[float] = field_with_opts(key="maxEnergyPerIntervalKwh")

def __post_init__(self):
# There are three ways of setting the scaling factor: by 'kwp' fields; by 'num plot' fields; or by
# explicitly setting the 'scalingFactor'. This is partly to support older configurations.
Expand Down
1 change: 1 addition & 0 deletions src/skypro/commands/simulator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ def _process_profiles(
source=profile_config.source,
time_index=time_index,
file_path_resolver_func=file_path_resolver_func,
max_energy_per_interval_kwh=profile_config.max_energy_per_interval_kwh,
)

profiler = Profiler(
Expand Down
56 changes: 54 additions & 2 deletions src/skypro/common/data/get_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ def get_profile(
source: ProfileDataSource,
time_index: pd.DatetimeIndex,
file_path_resolver_func: Optional[Callable],
max_energy_per_interval_kwh: Optional[float] = None,
) -> pd.DataFrame:
"""
Reads the profile data source and returns a dataframe containing the profile with the given time index
Reads the profile data source and returns a dataframe containing the profile with the given time index.

If max_energy_per_interval_kwh is set, CSV profiles are filtered: rows with |energy| above the
threshold are replaced with NaN and linearly interpolated. Default (None) is no filter — raw
values pass through, so corrupted source data surfaces loudly in downstream sim output rather
than being silently rewritten.
"""

if source.csv_profile_data_source:
df = _get_csv_profile(
source=source.csv_profile_data_source,
file_path_resolver_func=file_path_resolver_func,
max_energy_per_interval_kwh=max_energy_per_interval_kwh,
)
elif source.constant_profile_data_source:
df = _get_constant_profile(
Expand Down Expand Up @@ -50,13 +57,50 @@ def _get_constant_profile(
def _get_csv_profile(
source: CSVProfileDataSource,
file_path_resolver_func: Optional[Callable],
max_energy_per_interval_kwh: Optional[float] = None,
) -> pd.DataFrame:
"""
Returns a profile using the given CSV files
Returns a profile using the given CSV files.

When max_energy_per_interval_kwh is set, energy values above this absolute threshold are
treated as anomalous: replaced with NaN, then linearly interpolated (with ffill/bfill at
edges). Activate it on a per-profile basis from YAML via maxEnergyPerIntervalKwh when
upstream metering is known to emit corrupt rows. The default (None) is opt-in: raw values
pass through unchanged unless a threshold is explicitly set.
"""

df = get_csv_data_source(source, file_path_resolver_func)

# Defensive filtering: opt-in via max_energy_per_interval_kwh.
if max_energy_per_interval_kwh is not None and "energy" in df.columns:
anomalous_mask = df["energy"].abs() > max_energy_per_interval_kwh
num_anomalous = anomalous_mask.sum()
if num_anomalous > 0:
anomalous_rows = df[anomalous_mask]
max_val = anomalous_rows["energy"].max()
min_val = anomalous_rows["energy"].min()
# Surface enough detail for an operator to chase the upstream meter glitch:
# count, range, and the first/last bad timestamps when available.
time_col = next(
(c for c in ("UTCTime", "ClockTime") if c in anomalous_rows.columns),
None,
)
if time_col is not None and len(anomalous_rows) > 0:
ts_str = (
f" Timestamps: {anomalous_rows[time_col].iloc[0]} → "
f"{anomalous_rows[time_col].iloc[-1]}."
)
else:
ts_str = ""
logging.warning(
f"Dropped {num_anomalous} rows with anomalous energy values "
f"(>{max_energy_per_interval_kwh} kWh per interval). "
f"Range: {min_val:.2f} to {max_val:.2f} kWh.{ts_str} "
f"This typically indicates corrupted meter data."
)
# Set anomalous values to NaN (will be interpolated below).
df.loc[anomalous_mask, "energy"] = np.nan

# Prefer to use the UTCTime column, but if it's not present then use ClockTime with the Europe/London timezone
use_clocktime = "UTCTime" not in df.columns or np.all(pd.isnull(df["UTCTime"]))
if use_clocktime:
Expand All @@ -82,4 +126,12 @@ def _get_csv_profile(
if "ClockTime" in df.columns:
df = df.drop("ClockTime", axis=1)

# Interpolate any NaN values in energy column (from anomalous data filtering).
if "energy" in df.columns and df["energy"].isna().any():
num_nans = df["energy"].isna().sum()
df["energy"] = df["energy"].interpolate(method="linear")
# Edge cases (NaN at start/end) — forward/backward fill.
df["energy"] = df["energy"].ffill().bfill()
logging.info(f"Interpolated {num_nans} missing energy values in profile")

return df
Empty file.
117 changes: 117 additions & 0 deletions src/tests/unit/skypro/common/data/test_get_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import unittest
import tempfile
import os

from skypro.common.data.get_profile import _get_csv_profile
from skypro.common.config.data_source_csv import CSVProfileDataSource


class TestGetProfileDefensiveFiltering(unittest.TestCase):
"""Anomaly-filter is opt-in via max_energy_per_interval_kwh. Without it, raw values pass
through unchanged so corrupt source data surfaces loudly downstream."""

def test_filter_off_by_default(self):
"""Without max_energy_per_interval_kwh, raw values pass through (even huge ones)."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("UTCTime,ClockTime,energy\n")
f.write("2025-01-01 00:00:00+00:00,2025-01-01 00:00:00+00:00,10.0\n")
# Massive value — but no threshold set, so it stays.
f.write("2025-01-01 00:30:00+00:00,2025-01-01 00:30:00+00:00,1000000.0\n")
f.write("2025-01-01 01:00:00+00:00,2025-01-01 01:00:00+00:00,12.0\n")
temp_path = f.name

try:
source = CSVProfileDataSource(dir=None, file=temp_path)
df = _get_csv_profile(source, None) # threshold defaults to None
energies = df['energy'].tolist()
self.assertAlmostEqual(energies[0], 10.0)
self.assertAlmostEqual(energies[1], 1000000.0) # untouched
self.assertAlmostEqual(energies[2], 12.0)
finally:
os.unlink(temp_path)

def test_anomalous_values_are_filtered_and_interpolated_when_configured(self):
"""With threshold set, values above it are NaN'd then linearly interpolated."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("UTCTime,ClockTime,energy\n")
f.write("2025-01-01 00:00:00+00:00,2025-01-01 00:00:00+00:00,10.0\n")
f.write("2025-01-01 00:30:00+00:00,2025-01-01 00:30:00+00:00,12.0\n")
f.write("2025-01-01 01:00:00+00:00,2025-01-01 01:00:00+00:00,1000000.0\n")
f.write("2025-01-01 01:30:00+00:00,2025-01-01 01:30:00+00:00,14.0\n")
f.write("2025-01-01 02:00:00+00:00,2025-01-01 02:00:00+00:00,16.0\n")
temp_path = f.name

try:
source = CSVProfileDataSource(dir=None, file=temp_path)
df = _get_csv_profile(source, None, max_energy_per_interval_kwh=500)
self.assertEqual(len(df), 5)
energies = df['energy'].tolist()
self.assertAlmostEqual(energies[0], 10.0)
self.assertAlmostEqual(energies[1], 12.0)
self.assertAlmostEqual(energies[2], 13.0) # interpolated mid-gap
self.assertAlmostEqual(energies[3], 14.0)
self.assertAlmostEqual(energies[4], 16.0)
self.assertTrue(all(e <= 500 for e in energies))
finally:
os.unlink(temp_path)

def test_normal_values_unchanged_when_threshold_set(self):
"""If threshold is set but no values exceed it, the data passes through untouched."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("UTCTime,ClockTime,energy\n")
f.write("2025-01-01 00:00:00+00:00,2025-01-01 00:00:00+00:00,10.0\n")
f.write("2025-01-01 00:30:00+00:00,2025-01-01 00:30:00+00:00,20.0\n")
f.write("2025-01-01 01:00:00+00:00,2025-01-01 01:00:00+00:00,30.0\n")
temp_path = f.name

try:
source = CSVProfileDataSource(dir=None, file=temp_path)
df = _get_csv_profile(source, None, max_energy_per_interval_kwh=500)
energies = df['energy'].tolist()
self.assertAlmostEqual(energies[0], 10.0)
self.assertAlmostEqual(energies[1], 20.0)
self.assertAlmostEqual(energies[2], 30.0)
finally:
os.unlink(temp_path)

def test_edge_anomaly_uses_ffill_bfill(self):
"""Anomaly at start of series gets back-filled (interpolation can't help at edges)."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("UTCTime,ClockTime,energy\n")
f.write("2025-01-01 00:00:00+00:00,2025-01-01 00:00:00+00:00,999999.0\n")
f.write("2025-01-01 00:30:00+00:00,2025-01-01 00:30:00+00:00,10.0\n")
f.write("2025-01-01 01:00:00+00:00,2025-01-01 01:00:00+00:00,15.0\n")
temp_path = f.name

try:
source = CSVProfileDataSource(dir=None, file=temp_path)
df = _get_csv_profile(source, None, max_energy_per_interval_kwh=500)
energies = df['energy'].tolist()
self.assertAlmostEqual(energies[0], 10.0) # back-filled
self.assertAlmostEqual(energies[1], 10.0)
self.assertAlmostEqual(energies[2], 15.0)
finally:
os.unlink(temp_path)

def test_negative_anomalous_values_filtered(self):
"""Filter uses abs(), so large-magnitude negative values are also caught."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("UTCTime,ClockTime,energy\n")
f.write("2025-01-01 00:00:00+00:00,2025-01-01 00:00:00+00:00,10.0\n")
f.write("2025-01-01 00:30:00+00:00,2025-01-01 00:30:00+00:00,-1000000.0\n")
f.write("2025-01-01 01:00:00+00:00,2025-01-01 01:00:00+00:00,20.0\n")
temp_path = f.name

try:
source = CSVProfileDataSource(dir=None, file=temp_path)
df = _get_csv_profile(source, None, max_energy_per_interval_kwh=500)
energies = df['energy'].tolist()
self.assertAlmostEqual(energies[0], 10.0)
self.assertAlmostEqual(energies[1], 15.0) # interpolated
self.assertAlmostEqual(energies[2], 20.0)
finally:
os.unlink(temp_path)


if __name__ == '__main__':
unittest.main()
Loading