Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions cpp/csp/cppnodes/statsimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,19 @@ class Variance
void add( double x )
{
m_count++;
// Track consecutive same values to avoid numerical errors when all values are identical
if( m_count == 1 )
{
m_lastValue = x;
m_consecutiveValueCount = 1;
}
if( x == m_lastValue && m_count > 1)
m_consecutiveValueCount++;
else
{
m_lastValue = x;
m_consecutiveValueCount = 1;
}
m_dx = x - m_mean;
m_mean += m_dx / m_count;
m_unnormVar += ( x - m_mean ) * m_dx;
Expand All @@ -405,9 +418,14 @@ class Variance
void remove( double x )
{
m_count--;
// Note: For sliding windows, we cannot accurately track consecutive values when removing from the beginning,
// and we don't need to. Since we are checking m_consecutiveValueCount >= m_count in compute,
// the only events needs reset is a new value comes in at the end.
if( m_count == 0 )
{
m_mean = m_unnormVar = 0;
m_consecutiveValueCount = 0;
m_lastValue = 0;
return;
}
m_dx = x - m_mean;
Expand All @@ -418,12 +436,19 @@ class Variance
void reset()
{
m_mean = m_unnormVar = m_count = 0;
m_consecutiveValueCount = 0;
m_lastValue = 0;
}

double compute() const
{
if( m_count > m_ddof )
{
// Check if all values are identical
if( m_consecutiveValueCount >= m_count [[unlikely]])
return 0.0;
return ( m_unnormVar < 0 ? 0 : m_unnormVar / ( m_count - m_ddof ) );
}

return std::numeric_limits<double>::quiet_NaN();
}
Expand All @@ -435,6 +460,9 @@ class Variance
double m_dx;
double m_count;
int64_t m_ddof;
// Below variables are used to eliminate numerical errors when all values in the window are identical
double m_lastValue;
int64_t m_consecutiveValueCount;
};

class WeightedVariance
Expand All @@ -455,6 +483,20 @@ class WeightedVariance
{
if( w <= 0 )
return;
// Track consecutive same values and observation count
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the same comments apply for WeightedVariance as for Variance

m_count++;
if( m_count == 1 )
{
m_lastValue = x;
m_consecutiveValueCount = 1;
}
if( x == m_lastValue && m_count > 1 )
m_consecutiveValueCount++;
else
{
m_lastValue = x;
m_consecutiveValueCount = 1;
}
m_wsum += w;
m_dx = x - m_wmean;
m_wmean += ( w / m_wsum ) * m_dx;
Expand All @@ -463,10 +505,14 @@ class WeightedVariance

void remove( double x, double w )
{
m_count--;
m_wsum -= w;
if( m_wsum < EPSILON )
{
m_wsum = m_wmean = m_unnormWVar = 0;
m_count = 0;
m_consecutiveValueCount = 0;
m_lastValue = 0;
return;
}
m_dx = x - m_wmean;
Expand All @@ -477,12 +523,20 @@ class WeightedVariance
void reset()
{
m_wsum = m_wmean = m_unnormWVar = 0;
m_count = 0;
m_consecutiveValueCount = 0;
m_lastValue = 0;
}

double compute() const
{
if( m_wsum > m_ddof )
{
// Check if all values are identical
if( m_consecutiveValueCount >= m_count [[unlikely]])
return 0.0;
return ( m_unnormWVar < 0 ? 0 : m_unnormWVar / ( m_wsum - m_ddof ) );
}

return std::numeric_limits<double>::quiet_NaN();
}
Expand All @@ -494,6 +548,10 @@ class WeightedVariance
double m_unnormWVar;
double m_dx;
int64_t m_ddof;
// Below variables are used to eliminate numerical errors when all values in the window are identical
int64_t m_count;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove m_count here (see the prior comment about how its not needed in the if-check in compute).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated weighted variance to match variance. m_count as a variable is still needed in weighted variance because we need it to initialize m_lastValue and m_consecutiveValueCount.

double m_lastValue;
int64_t m_consecutiveValueCount;
};

class Covariance
Expand Down
79 changes: 79 additions & 0 deletions csp/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -3670,6 +3670,85 @@ def g():
)
np.testing.assert_allclose(res["ema_std"][1], golden_ema_std, atol=1e-10)

golden_ema_std = np.array(
[
pd.Series(values[max(0, j - horizon + 1): j + 1]).ewm(alpha=alpha, ignore_na=True).std().iloc[-1]
for j in range(N)
]
)
np.testing.assert_allclose(res["ema_std"][1], golden_ema_std, atol=1e-10)

def test_identical_values_variance(self):
"""Test that variance and weighted variance are exactly 0 when all values in window are identical"""
st = datetime(2023, 1, 1, 9, 0, 0)

@csp.node
def generate_data() -> csp.ts[float]:
"""Generate data with periods of identical values"""
with csp.alarms():
alarm = csp.alarm(bool)

with csp.start():
# Period 1: 5 ticks of -1.0
for i in range(5):
csp.schedule_alarm(alarm, timedelta(seconds=i * 10), True)

# Period 2: 5 ticks of -2.0
for i in range(5):
csp.schedule_alarm(alarm, timedelta(minutes=1, seconds=i * 10), True)

# Period 3: 5 ticks of -3.0
for i in range(5):
csp.schedule_alarm(alarm, timedelta(minutes=2, seconds=i * 10), True)

if csp.ticked(alarm):
current_time = csp.now()
seconds = (current_time - st).total_seconds()

if seconds < 60:
return -1.0 # Period 1
elif seconds < 120:
return -2.0 # Period 2
else:
return -3.0 # Period 3

@csp.graph
def graph():
# Generate data with identical values in each 1-minute window
data = generate_data()
# Generate uniform weights that tick at the same time as data
weights = data * 0.0 + 1.0

# Test variance and weighted variance with 1-minute resampling
resample_interval = timedelta(seconds=60)
timer = csp.timer(interval=resample_interval, value=True)

# Regular variance
var_result = csp.stats.var(data, interval=resample_interval, trigger=timer)

# Weighted variance (using uniform weights of 1.0)
wvar_result = csp.stats.var(data, interval=resample_interval, trigger=timer, weights=weights)

csp.add_graph_output("variance", var_result)
csp.add_graph_output("weighted_variance", wvar_result)

end_time = st + timedelta(minutes=3) # End at 9:03 to avoid 4th minute output
results = csp.run(graph, starttime=st, endtime=end_time)

# Convert results to DataFrame
df = pd.DataFrame({
'variance': [v for _, v in results["variance"]],
'weighted_variance': [v for _, v in results["weighted_variance"]]
})

# Assert 1: weighted variance should equal unweighted variance
pd.testing.assert_series_equal(df['variance'], df['weighted_variance'], check_names=False)

# Assert 2: 3rd minute (index 2) should be zero, first two should be non-zero
self.assertEqual(df.iloc[2]['variance'], 0.0) # 3rd minute should be exactly 0
self.assertGreater(df.iloc[0]['variance'], 0.0) # 1st minute should be > 0
self.assertGreater(df.iloc[1]['variance'], 0.0) # 2nd minute should be > 0


if __name__ == "__main__":
unittest.main()