Skip to content

Commit

Permalink
bigtable/bttest: TimestampRangeFilter improvements
Browse files Browse the repository at this point in the history
Introduce TimestampRangeFilterMicros and add support for
the filter in the emulator.

Change-Id: I79d21575ba15de65702d7caa8c86ad1c7f45095b
Reviewed-on: https://code-review.googlesource.com/9563
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
garye committed Nov 28, 2016
1 parent 8b767ae commit 7bb67ec
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
24 changes: 24 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,30 @@ func TestClientIntegration(t *testing.T) {
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
}
// Check timestamp range filtering
r, err = tbl.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 3000)))
if err != nil {
t.Fatalf("Reading row: %v", err)
}
wantRow = Row{"ts": []ReadItem{
{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
}}
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow)
}
r, err = tbl.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0)))
if err != nil {
t.Fatalf("Reading row: %v", err)
}
wantRow = Row{"ts": []ReadItem{
{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
}}
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow)
}
// Delete the cell with timestamp 2000 and repeat the last read,
// checking that we get ts 3000 and ts 1000.
mut = NewMutation()
Expand Down
8 changes: 6 additions & 2 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,15 @@ func includeCell(f *btpb.RowFilter, fam, col string, cell cell) bool {
inRangeEnd := func() bool { return true }
switch eq := f.ColumnRangeFilter.EndQualifier.(type) {
case *btpb.ColumnRange_EndQualifierClosed:
inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed)}
inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed) }
case *btpb.ColumnRange_EndQualifierOpen:
inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen)}
inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen) }
}
return inRangeStart() && inRangeEnd()
case *btpb.RowFilter_TimestampRangeFilter:
// Lower bound is inclusive and defaults to 0, upper bound is exclusive and defaults to infinity.
return cell.ts >= f.TimestampRangeFilter.StartTimestampMicros &&
(f.TimestampRangeFilter.EndTimestampMicros == 0 || cell.ts < f.TimestampRangeFilter.EndTimestampMicros)
}
}

Expand Down
33 changes: 20 additions & 13 deletions bigtable/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,34 @@ func (stripValueFilter) proto() *btpb.RowFilter {
// TimestampRangeFilter returns a filter that matches any rows whose timestamp is within the given time bounds. A zero
// time means no bound.
func TimestampRangeFilter(startTime time.Time, endTime time.Time) Filter {
trf := timestampRangeFilter{}
if !startTime.IsZero() {
trf.startTime = Time(startTime)
}
if !endTime.IsZero() {
trf.endTime = Time(endTime)
}
return trf
}

// TimestampRangeFilterMicros returns a filter that matches any rows whose timestamp is within the given time bounds,
// specified in units of microseconds since 1 January 1970. A zero value for the end time is interpreted as no bound.
func TimestampRangeFilterMicros(startTime Timestamp, endTime Timestamp) Filter {
return timestampRangeFilter{startTime, endTime}
}

type timestampRangeFilter struct {
startTime time.Time
endTime time.Time
startTime Timestamp
endTime Timestamp
}

func (trf timestampRangeFilter) String() string {
return fmt.Sprintf("timestamp_range(%s,%s)", trf.startTime, trf.endTime)
}

func (trf timestampRangeFilter) proto() *btpb.RowFilter {
r := &btpb.TimestampRange{}
if !trf.startTime.IsZero() {
r.StartTimestampMicros = trf.startTime.UnixNano() / 1e3
}
if !trf.endTime.IsZero() {
r.EndTimestampMicros = trf.endTime.UnixNano() / 1e3
}
return &btpb.RowFilter{Filter: &btpb.RowFilter_TimestampRangeFilter{r}}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_TimestampRangeFilter{&btpb.TimestampRange{int64(trf.startTime), int64(trf.endTime)}}}
}

// ColumnRangeFilter returns a filter that matches a contiguous range of columns within a single
Expand All @@ -188,16 +195,16 @@ func ColumnRangeFilter(family, start, end string) Filter {

type columnRangeFilter struct {
family string
start string
end string
start string
end string
}

func (crf columnRangeFilter) String() string {
return fmt.Sprintf("columnRangeFilter(%s,%s,%s)", crf.family, crf.start, crf.end)
}

func (crf columnRangeFilter) proto() *btpb.RowFilter {
r := &btpb.ColumnRange{FamilyName:crf.family}
r := &btpb.ColumnRange{FamilyName: crf.family}
if crf.start != "" {
r.StartQualifier = &btpb.ColumnRange_StartQualifierClosed{[]byte(crf.start)}
}
Expand Down

0 comments on commit 7bb67ec

Please sign in to comment.