Skip to content

Commit

Permalink
fix(bttest): fix race condition in SampleRowKeys (#4207)
Browse files Browse the repository at this point in the history
Fixes mutex locking/unlocking and adds a test for concurrent operations.
  • Loading branch information
coryan authored Jun 3, 2021
1 parent fdf29be commit 5711fb1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
4 changes: 3 additions & 1 deletion bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,14 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu

func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
tbl, ok := s.tables[req.Name]
s.mu.Unlock()
if !ok {
return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
}

tbl.mu.Lock()
defer tbl.mu.Unlock()
if req.GetDeleteAllDataFromTable() {
tbl.rows = btree.New(btreeDegree)
} else {
Expand Down
70 changes: 70 additions & 0 deletions bigtable/bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,76 @@ func TestSampleRowKeys(t *testing.T) {
}
}

func TestTableRowsConcurrent(t *testing.T) {
s := &server{
tables: make(map[string]*table),
}
ctx := context.Background()
newTbl := btapb.Table{
ColumnFamilies: map[string]*btapb.ColumnFamily{
"cf": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}},
},
}
tbl, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl})
if err != nil {
t.Fatalf("Creating table: %v", err)
}

// Populate the table
populate := func() {
rowCount := 100
for i := 0; i < rowCount; i++ {
req := &btpb.MutateRowRequest{
TableName: tbl.Name,
RowKey: []byte("row-" + strconv.Itoa(i)),
Mutations: []*btpb.Mutation{{
Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
FamilyName: "cf",
ColumnQualifier: []byte("col"),
TimestampMicros: 1000,
Value: []byte("value"),
}},
}},
}
if _, err := s.MutateRow(ctx, req); err != nil {
t.Fatalf("Populating table: %v", err)
}
}
}

attempts := 500
finished := make(chan bool)
go func() {
populate()
mock := &MockSampleRowKeysServer{}
for i := 0; i < attempts; i++ {
if err := s.SampleRowKeys(&btpb.SampleRowKeysRequest{TableName: tbl.Name}, mock); err != nil {
t.Errorf("SampleRowKeys error: %v", err)
}
}
finished <- true
}()
go func() {
for i := 0; i < attempts; i++ {
req := &btapb.DropRowRangeRequest{
Name: tbl.Name,
Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true},
}
if _, err = s.DropRowRange(ctx, req); err != nil {
t.Fatalf("Dropping all rows: %v", err)
}
}
finished <- true
}()
for i := 0; i < 2; i++ {
select {
case <-finished:
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for task %d\n", i)
}
}
}

func TestDropRowRange(t *testing.T) {
s := &server{
tables: make(map[string]*table),
Expand Down

0 comments on commit 5711fb1

Please sign in to comment.