diff --git a/cpp/csp/cppnodes/statsimpl.h b/cpp/csp/cppnodes/statsimpl.h index 0c32b138d..9fbf91ce7 100644 --- a/cpp/csp/cppnodes/statsimpl.h +++ b/cpp/csp/cppnodes/statsimpl.h @@ -397,6 +397,19 @@ class Variance void add( double x ) { m_count++; + // Track consecutive same values to avoid numerical errors when all values are identical + if( m_count == 1 ) + { + m_lastValue = x; + m_consecutiveValueCount = 1; + } + if( x == m_lastValue && m_count > 1) + m_consecutiveValueCount++; + else + { + m_lastValue = x; + m_consecutiveValueCount = 1; + } m_dx = x - m_mean; m_mean += m_dx / m_count; m_unnormVar += ( x - m_mean ) * m_dx; @@ -405,9 +418,14 @@ class Variance void remove( double x ) { m_count--; + // Note: For sliding windows, we cannot accurately track consecutive values when removing from the beginning, + // and we don't need to. Since we are checking m_consecutiveValueCount >= m_count in compute, + // the only events needs reset is a new value comes in at the end. if( m_count == 0 ) { m_mean = m_unnormVar = 0; + m_consecutiveValueCount = 0; + m_lastValue = 0; return; } m_dx = x - m_mean; @@ -418,12 +436,19 @@ class Variance void reset() { m_mean = m_unnormVar = m_count = 0; + m_consecutiveValueCount = 0; + m_lastValue = 0; } double compute() const { if( m_count > m_ddof ) + { + // Check if all values are identical + if( m_consecutiveValueCount >= m_count [[unlikely]]) + return 0.0; return ( m_unnormVar < 0 ? 0 : m_unnormVar / ( m_count - m_ddof ) ); + } return std::numeric_limits::quiet_NaN(); } @@ -435,6 +460,9 @@ class Variance double m_dx; double m_count; int64_t m_ddof; + // Below variables are used to eliminate numerical errors when all values in the window are identical + double m_lastValue; + int64_t m_consecutiveValueCount; }; class WeightedVariance @@ -455,6 +483,20 @@ class WeightedVariance { if( w <= 0 ) return; + // Track consecutive same values and observation count + m_count++; + if( m_count == 1 ) + { + m_lastValue = x; + m_consecutiveValueCount = 1; + } + if( x == m_lastValue && m_count > 1 ) + m_consecutiveValueCount++; + else + { + m_lastValue = x; + m_consecutiveValueCount = 1; + } m_wsum += w; m_dx = x - m_wmean; m_wmean += ( w / m_wsum ) * m_dx; @@ -463,10 +505,14 @@ class WeightedVariance void remove( double x, double w ) { + m_count--; m_wsum -= w; if( m_wsum < EPSILON ) { m_wsum = m_wmean = m_unnormWVar = 0; + m_count = 0; + m_consecutiveValueCount = 0; + m_lastValue = 0; return; } m_dx = x - m_wmean; @@ -477,12 +523,20 @@ class WeightedVariance void reset() { m_wsum = m_wmean = m_unnormWVar = 0; + m_count = 0; + m_consecutiveValueCount = 0; + m_lastValue = 0; } double compute() const { if( m_wsum > m_ddof ) + { + // Check if all values are identical + if( m_consecutiveValueCount >= m_count [[unlikely]]) + return 0.0; return ( m_unnormWVar < 0 ? 0 : m_unnormWVar / ( m_wsum - m_ddof ) ); + } return std::numeric_limits::quiet_NaN(); } @@ -494,6 +548,10 @@ class WeightedVariance double m_unnormWVar; double m_dx; int64_t m_ddof; + // Below variables are used to eliminate numerical errors when all values in the window are identical + int64_t m_count; + double m_lastValue; + int64_t m_consecutiveValueCount; }; class Covariance diff --git a/csp/tests/test_stats.py b/csp/tests/test_stats.py index 71b922a75..45a70c565 100644 --- a/csp/tests/test_stats.py +++ b/csp/tests/test_stats.py @@ -3670,6 +3670,85 @@ def g(): ) np.testing.assert_allclose(res["ema_std"][1], golden_ema_std, atol=1e-10) + golden_ema_std = np.array( + [ + pd.Series(values[max(0, j - horizon + 1): j + 1]).ewm(alpha=alpha, ignore_na=True).std().iloc[-1] + for j in range(N) + ] + ) + np.testing.assert_allclose(res["ema_std"][1], golden_ema_std, atol=1e-10) + + def test_identical_values_variance(self): + """Test that variance and weighted variance are exactly 0 when all values in window are identical""" + st = datetime(2023, 1, 1, 9, 0, 0) + + @csp.node + def generate_data() -> csp.ts[float]: + """Generate data with periods of identical values""" + with csp.alarms(): + alarm = csp.alarm(bool) + + with csp.start(): + # Period 1: 5 ticks of -1.0 + for i in range(5): + csp.schedule_alarm(alarm, timedelta(seconds=i * 10), True) + + # Period 2: 5 ticks of -2.0 + for i in range(5): + csp.schedule_alarm(alarm, timedelta(minutes=1, seconds=i * 10), True) + + # Period 3: 5 ticks of -3.0 + for i in range(5): + csp.schedule_alarm(alarm, timedelta(minutes=2, seconds=i * 10), True) + + if csp.ticked(alarm): + current_time = csp.now() + seconds = (current_time - st).total_seconds() + + if seconds < 60: + return -1.0 # Period 1 + elif seconds < 120: + return -2.0 # Period 2 + else: + return -3.0 # Period 3 + + @csp.graph + def graph(): + # Generate data with identical values in each 1-minute window + data = generate_data() + # Generate uniform weights that tick at the same time as data + weights = data * 0.0 + 1.0 + + # Test variance and weighted variance with 1-minute resampling + resample_interval = timedelta(seconds=60) + timer = csp.timer(interval=resample_interval, value=True) + + # Regular variance + var_result = csp.stats.var(data, interval=resample_interval, trigger=timer) + + # Weighted variance (using uniform weights of 1.0) + wvar_result = csp.stats.var(data, interval=resample_interval, trigger=timer, weights=weights) + + csp.add_graph_output("variance", var_result) + csp.add_graph_output("weighted_variance", wvar_result) + + end_time = st + timedelta(minutes=3) # End at 9:03 to avoid 4th minute output + results = csp.run(graph, starttime=st, endtime=end_time) + + # Convert results to DataFrame + df = pd.DataFrame({ + 'variance': [v for _, v in results["variance"]], + 'weighted_variance': [v for _, v in results["weighted_variance"]] + }) + + # Assert 1: weighted variance should equal unweighted variance + pd.testing.assert_series_equal(df['variance'], df['weighted_variance'], check_names=False) + + # Assert 2: 3rd minute (index 2) should be zero, first two should be non-zero + self.assertEqual(df.iloc[2]['variance'], 0.0) # 3rd minute should be exactly 0 + self.assertGreater(df.iloc[0]['variance'], 0.0) # 1st minute should be > 0 + self.assertGreater(df.iloc[1]['variance'], 0.0) # 2nd minute should be > 0 + if __name__ == "__main__": unittest.main()