Skip to content

Commit

Permalink
Merge pull request #6 from DataDog/stephen/allow-duplicate-t-values
Browse files Browse the repository at this point in the history
Allow data sets with duplicate `t` values
  • Loading branch information
StephenKappel committed Mar 29, 2018
2 parents 04240ac + 3ff4132 commit 86a0bdf
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 92 deletions.
41 changes: 32 additions & 9 deletions piecewise/regressor.py
Expand Up @@ -199,7 +199,7 @@ def segments(self):


def _preprocess(t, v):
""" Raises and exception if any of the inputs are not valid.
""" Raises an exception if any of the inputs are not valid.
Otherwise, returns a list of Points, ordered by t.
"""
# Validate the inputs.
Expand All @@ -212,8 +212,6 @@ def _preprocess(t, v):
if np.sum(finite_mask) < 2:
raise ValueError('`v` must have at least 2 finite values.')
t_arr, v_arr = t_arr[finite_mask], v_arr[finite_mask]
if len(np.unique(t_arr)) != len(t_arr):
raise ValueError('All `t` values must be unique.')

# Order both arrays by t-values.
sort_order = np.argsort(t_arr)
Expand All @@ -237,17 +235,42 @@ def _get_initial_segments(t, v):
still be suboptimal initializations, as in this case, where the two 1s will
be initialized in the same segment: [19, 10, 1, 1, -8, -17]
"""
# If there are multiple values at the same t, average them and treat them
# like a single point during initialization. This ensures that all the
# points with the same t are assigned to the same linear segment.
index_ranges, averages = [], []
start_index, last_t = 0, t[0]
for i in range(1, len(t)):
if t[i] != last_t:
index_ranges.append((start_index, i))
averages.append(np.mean(v[start_index:i]))
start_index = i
last_t = t[i]
index_ranges.append((start_index, i+1))
averages.append(np.mean(v[start_index:]))

# Pair every other t with the t on its left or on its right, based on which
# is closer.
seed_assignments = defaultdict(list)
for i in range(1, len(t), 2):
left_diff = abs(v[i-1] - v[i])
right_diff = abs(v[i+1] - v[i]) if len(v) > i+1 else np.inf
for i in range(1, len(averages), 2):
left_diff = abs(averages[i-1] - averages[i])
right_diff = abs(averages[i+1] - averages[i]) if len(averages) > i+1 else np.inf
best_seed = i-1 if left_diff < right_diff else i+1
seed_assignments[best_seed].append(i)

# Build the Segment objects.
segments = []
for i in range(0, len(t), 2):
indices = seed_assignments[i] + [i]
start_index, end_index = min(indices), max(indices)+1
for i in range(0, len(index_ranges), 2):
start_index = min([
index_ranges[j][0]
for j in seed_assignments[i] + [i]
])
end_index = max([
index_ranges[j][1]
for j in seed_assignments[i] + [i]
])
segments.append(_make_segment(t, v, start_index, end_index))

return segments


Expand Down
188 changes: 105 additions & 83 deletions tests/test_piecewise.py
@@ -1,97 +1,119 @@
# std
import unittest

# 3p
import numpy as np

# prj
from piecewise.regressor import piecewise


def test_single_line():
""" When the data follows a single linear path with Gaussian noise, then
only one segment should be found.
"""
# Generate some data.
np.random.seed(1)
intercept = -45.0
slope = 0.7
t = np.arange(2000)
v = intercept + slope*t + np.random.normal(0, 1, 2000)
# Fit the piecewise regression.
model = piecewise(t, v)
# A single segment should be found, encompassing the whole domain with
# coefficients approximately equal to those used to generate the data.
np.testing.assert_equal(len(model.segments), 1)
seg = model.segments[0]
np.testing.assert_equal(seg.start_t, 0)
np.testing.assert_equal(seg.end_t, 1999)
np.testing.assert_almost_equal(seg.coeffs[0], intercept, decimal=0)
np.testing.assert_almost_equal(seg.coeffs[1], slope, decimal=0)

class TestPiecewise(unittest.TestCase):

def test_single_line_with_nans():
""" Some nans in the data shouldn't break the regression, and leading and
trailing nans should lead to exclusion of the corresponding t values from
the segment domain.
"""
# Generate some data, and introduce nans.
np.random.seed(1)
intercept = -45.0
slope = 0.7
t = np.arange(2000)
v = intercept + slope*t + np.random.normal(0, 1, 2000)
v[[0, 24, 400, 401, 402, 1000, 1999]] = np.nan
# Fit the piecewise regression.
model = piecewise(t, v)
# A single segment should be found, encompassing the whole domain (excluding
# the leading and trailing nans) with coefficients approximately equal to
# those used to generate the data.
np.testing.assert_equal(len(model.segments), 1)
seg = model.segments[0]
np.testing.assert_equal(seg.start_t, 1)
np.testing.assert_equal(seg.end_t, 1998)
np.testing.assert_almost_equal(seg.coeffs[0], intercept, decimal=0)
np.testing.assert_almost_equal(seg.coeffs[1], slope, decimal=0)
def test_single_line(self):
""" When the data follows a single linear path with Gaussian noise, then
only one segment should be found.
"""
# Generate some data.
np.random.seed(1)
intercept = -45.0
slope = 0.7
t = np.arange(2000)
v = intercept + slope*t + np.random.normal(0, 1, 2000)
# Fit the piecewise regression.
model = piecewise(t, v)
# A single segment should be found, encompassing the whole domain with
# coefficients approximately equal to those used to generate the data.
np.testing.assert_equal(len(model.segments), 1)
seg = model.segments[0]
np.testing.assert_equal(seg.start_t, 0)
np.testing.assert_equal(seg.end_t, 1999)
np.testing.assert_almost_equal(seg.coeffs[0], intercept, decimal=0)
np.testing.assert_almost_equal(seg.coeffs[1], slope, decimal=0)

def test_single_line_with_nans(self):
""" Some nans in the data shouldn't break the regression, and leading and
trailing nans should lead to exclusion of the corresponding t values from
the segment domain.
"""
# Generate some data, and introduce nans.
np.random.seed(1)
intercept = -45.0
slope = 0.7
t = np.arange(2000)
v = intercept + slope*t + np.random.normal(0, 1, 2000)
v[[0, 24, 400, 401, 402, 1000, 1999]] = np.nan
# Fit the piecewise regression.
model = piecewise(t, v)
# A single segment should be found, encompassing the whole domain (excluding
# the leading and trailing nans) with coefficients approximately equal to
# those used to generate the data.
np.testing.assert_equal(len(model.segments), 1)
seg = model.segments[0]
np.testing.assert_equal(seg.start_t, 1)
np.testing.assert_equal(seg.end_t, 1998)
np.testing.assert_almost_equal(seg.coeffs[0], intercept, decimal=0)
np.testing.assert_almost_equal(seg.coeffs[1], slope, decimal=0)

def test_five_segments():
""" If there are multiple distinct segments, piecewise() should be able to
find the proper breakpoints between them.
"""
# Generate some data.
t = np.arange(1900, 2000)
v = t % 20
# Fit the piecewise regression.
model = piecewise(t, v)
# There should be five segments, each with a slope of 1.
np.testing.assert_equal(len(model.segments), 5)
for segment in model.segments:
np.testing.assert_almost_equal(segment.coeffs[1], 1.0)
# The segments should be in time order and each should cover 20 units of the
# domain.
np.testing.assert_equal(model.segments[0].start_t, 1900)
np.testing.assert_equal(model.segments[1].start_t, 1920)
np.testing.assert_equal(model.segments[2].start_t, 1940)
np.testing.assert_equal(model.segments[3].start_t, 1960)
np.testing.assert_equal(model.segments[4].start_t, 1980)
def test_five_segments(self):
""" If there are multiple distinct segments, piecewise() should be able to
find the proper breakpoints between them.
"""
# Generate some data.
t = np.arange(1900, 2000)
v = t % 20
# Fit the piecewise regression.
model = piecewise(t, v)
# There should be five segments, each with a slope of 1.
np.testing.assert_equal(len(model.segments), 5)
for segment in model.segments:
np.testing.assert_almost_equal(segment.coeffs[1], 1.0)
# The segments should be in time order and each should cover 20 units of the
# domain.
np.testing.assert_equal(model.segments[0].start_t, 1900)
np.testing.assert_equal(model.segments[1].start_t, 1920)
np.testing.assert_equal(model.segments[2].start_t, 1940)
np.testing.assert_equal(model.segments[3].start_t, 1960)
np.testing.assert_equal(model.segments[4].start_t, 1980)

def test_messy_ts(self):
""" Unevenly-spaced, out-of-order, float t-values should work.
"""
# Generate some step-function data.
t = [1.0, 0.2, 0.5, 0.4, 2.3, 1.1]
v = [5, 0, 0, 0, 5, 5]
# Fit the piecewise regression.
model = piecewise(t, v)
# There should be two constant-valued segments.
np.testing.assert_equal(len(model.segments), 2)
seg1, seg2 = model.segments

def test_messy_ts():
""" Unevenly-spaced, out-of-order, float t-values should work.
"""
# Generate some step-function data.
t = [1.0, 0.2, 0.5, 0.4, 2.3, 1.1]
v = [5, 0, 0, 0, 5, 5]
# Fit the piecewise regression.
model = piecewise(t, v)
# There should be two constant-valued segments.
np.testing.assert_equal(len(model.segments), 2)
seg1, seg2 = model.segments
np.testing.assert_equal(seg1.start_t, 0.2)
np.testing.assert_equal(seg1.end_t, 1.0)
np.testing.assert_almost_equal(seg1.coeffs[0], 0)
np.testing.assert_almost_equal(seg1.coeffs[1], 0)

np.testing.assert_equal(seg1.start_t, 0.2)
np.testing.assert_equal(seg1.end_t, 1.0)
np.testing.assert_almost_equal(seg1.coeffs[0], 0)
np.testing.assert_almost_equal(seg1.coeffs[1], 0)
np.testing.assert_equal(seg2.start_t, 1.0)
np.testing.assert_equal(seg2.end_t, 2.3)
np.testing.assert_almost_equal(seg2.coeffs[0], 5)
np.testing.assert_almost_equal(seg2.coeffs[1], 0)

np.testing.assert_equal(seg2.start_t, 1.0)
np.testing.assert_equal(seg2.end_t, 2.3)
np.testing.assert_almost_equal(seg2.coeffs[0], 5)
np.testing.assert_almost_equal(seg2.coeffs[1], 0)
def test_non_unique_ts(self):
""" A dataset with multiple values with the same t should not break the
code, and all points with the same t should be assigned to the same
segment.
"""
# Generate some data.
t1 = [t for t in range(100)]
v1 = [v for v in np.random.normal(3, 1, 100)]
t2 = [t for t in range(99, 199)]
v2 = [v for v in np.random.normal(20, 1, 100)]
t = t1 + t2
v = v1 + v2
# Fit the piecewise regression.
model = piecewise(t, v)
# There should be two segments, and the split shouldn't be in the middle
# of t=99.
np.testing.assert_equal(len(model.segments), 2)
seg1, seg2 = model.segments
assert seg1.end_t == seg2.start_t

0 comments on commit 86a0bdf

Please sign in to comment.