Skip to content

Commit

Permalink
bigtable: Add per row cell limit/offset filters
Browse files Browse the repository at this point in the history
Change-Id: I4ab06b1ceead1dc0d474c99db3437a15c0f2e830
Reviewed-on: https://code-review.googlesource.com/15010
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
garye committed Jul 25, 2017
1 parent ac0ff3d commit 9a38b47
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 0 deletions.
23 changes: 23 additions & 0 deletions bigtable/bigtable_test.go
Expand Up @@ -490,6 +490,29 @@ func TestClientIntegration(t *testing.T) {
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
}
// Check cell offset / limit
r, err = tbl.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
if err != nil {
t.Fatalf("Reading row: %v", err)
}
wantRow = Row{"ts": []ReadItem{
{Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
{Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
{Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
}}
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow)
}
r, err = tbl.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3)))
if err != nil {
t.Fatalf("Reading row: %v", err)
}
wantRow = Row{"ts": []ReadItem{
{Row: "testrow", Column: "ts:col", Timestamp: 0, Value: []byte("val-0")},
}}
if !reflect.DeepEqual(r, wantRow) {
t.Errorf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow)
}
// Check timestamp range filtering (with truncation)
r, err = tbl.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000)))
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions bigtable/bttest/inmem.go
Expand Up @@ -459,6 +459,33 @@ func filterRow(f *btpb.RowFilter, r *row) bool {
if !rx.MatchString(r.key) {
return false
}
case *btpb.RowFilter_CellsPerRowLimitFilter:
// Grab the first n cells in the row.
lim := int(f.CellsPerRowLimitFilter)
for _, fam := range r.families {
for col, cs := range fam.cells {
if len(cs) > lim {
fam.cells[col] = cs[:lim]
return true
}
lim -= len(cs)
}
}
return true
case *btpb.RowFilter_CellsPerRowOffsetFilter:
// Skip the first n cells in the row.
offset := int(f.CellsPerRowOffsetFilter)
for _, fam := range r.families {
for col, cs := range fam.cells {
if offset > 0 && offset < len(cs) {
fam.cells[col] = cs[offset:]
offset = 0
} else {
offset -= len(cs)
}
}
}
return true
}

// Any other case, operate on a per-cell basis.
Expand Down
30 changes: 30 additions & 0 deletions bigtable/filter.go
Expand Up @@ -285,4 +285,34 @@ func (cf conditionFilter) proto() *btpb.RowFilter {
}}}
}

// CellsPerRowOffsetFilter returns a filter that skips the first N cells of each row, matching all subsequent cells.
func CellsPerRowOffsetFilter(n int) Filter {
return cellsPerRowOffsetFilter(n)
}

type cellsPerRowOffsetFilter int32

func (cof cellsPerRowOffsetFilter) String() string {
return fmt.Sprintf("cells_per_row_offset(%d)", cof)
}

func (cof cellsPerRowOffsetFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowOffsetFilter{int32(cof)}}
}

// CellsPerRowLimitFilter returns a filter that matches only the first N cells of each row.
func CellsPerRowLimitFilter(n int) Filter {
return cellsPerRowLimitFilter(n)
}

type cellsPerRowLimitFilter int32

func (clf cellsPerRowLimitFilter) String() string {
return fmt.Sprintf("cells_per_row_limit(%d)", clf)
}

func (clf cellsPerRowLimitFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowLimitFilter{int32(clf)}}
}

// TODO(dsymonds): More filters: sampling

0 comments on commit 9a38b47

Please sign in to comment.