From f42ab0f8137f29be986f642220e1e896ac284fd7 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Tue, 6 May 2025 11:41:02 +0200 Subject: [PATCH 1/5] acc: Use granular locking in testserver --- acceptance/internal/handlers.go | 12 ++++----- acceptance/internal/prepare_server.go | 5 ++++ libs/testserver/fake_workspace.go | 38 +++++++++++++++++++++++++-- libs/testserver/server.go | 37 +++++++++++++++----------- 4 files changed, 68 insertions(+), 24 deletions(-) diff --git a/acceptance/internal/handlers.go b/acceptance/internal/handlers.go index 1d2b6ab5ad..5a0259e324 100644 --- a/acceptance/internal/handlers.go +++ b/acceptance/internal/handlers.go @@ -220,7 +220,7 @@ func addDefaultHandlers(server *testserver.Server) { // Quality monitors: server.Handle("GET", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any { - return testserver.MapGet(req.Workspace.Monitors, req.Vars["table_name"]) + return testserver.MapGet(req.Workspace, req.Workspace.Monitors, req.Vars["table_name"]) }) server.Handle("POST", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any { @@ -232,13 +232,13 @@ func addDefaultHandlers(server *testserver.Server) { }) server.Handle("DELETE", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any { - return testserver.MapDelete(req.Workspace.Monitors, req.Vars["table_name"]) + return testserver.MapDelete(req.Workspace, req.Workspace.Monitors, req.Vars["table_name"]) }) // Apps: server.Handle("GET", "/api/2.0/apps/{name}", func(req testserver.Request) any { - return testserver.MapGet(req.Workspace.Apps, req.Vars["name"]) + return testserver.MapGet(req.Workspace, req.Workspace.Apps, req.Vars["name"]) }) server.Handle("POST", "/api/2.0/apps", func(req testserver.Request) any { @@ -250,13 +250,13 @@ func addDefaultHandlers(server *testserver.Server) { }) server.Handle("DELETE", "/api/2.0/apps/{name}", func(req testserver.Request) any { - return testserver.MapDelete(req.Workspace.Apps, req.Vars["name"]) + return testserver.MapDelete(req.Workspace, req.Workspace.Apps, req.Vars["name"]) }) // Schemas: server.Handle("GET", "/api/2.1/unity-catalog/schemas/{full_name}", func(req testserver.Request) any { - return testserver.MapGet(req.Workspace.Schemas, req.Vars["full_name"]) + return testserver.MapGet(req.Workspace, req.Workspace.Schemas, req.Vars["full_name"]) }) server.Handle("POST", "/api/2.1/unity-catalog/schemas", func(req testserver.Request) any { @@ -268,6 +268,6 @@ func addDefaultHandlers(server *testserver.Server) { }) server.Handle("DELETE", "/api/2.1/unity-catalog/schemas/{full_name}", func(req testserver.Request) any { - return testserver.MapDelete(req.Workspace.Schemas, req.Vars["full_name"]) + return testserver.MapDelete(req.Workspace, req.Workspace.Schemas, req.Vars["full_name"]) }) } diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index 77b2195217..f92eae457b 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -90,6 +90,9 @@ func startDedicatedServer(t *testing.T, s.RequestCallback = func(request *testserver.Request) { req := getLoggedRequest(request, includeHeaders) reqJson, err := json.MarshalIndent(req, "", " ") + + defer s.LockUnlock()() + assert.NoErrorf(t, err, "Failed to json-encode: %#v", req) f, err := os.OpenFile(requestsPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) @@ -103,6 +106,8 @@ func startDedicatedServer(t *testing.T, if logRequests { s.ResponseCallback = func(request *testserver.Request, response *testserver.EncodedResponse) { + defer s.LockUnlock()() + t.Logf("%d %s %s\n%s\n%s", response.StatusCode, request.Method, request.URL, formatHeadersAndBody("> ", request.Headers, request.Body), diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 16201343e2..9ede9d7fd1 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -8,6 +8,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/databricks/databricks-sdk-go/service/apps" "github.com/databricks/databricks-sdk-go/service/catalog" @@ -19,6 +20,8 @@ import ( // FakeWorkspace holds a state of a workspace for acceptance tests. type FakeWorkspace struct { + mu sync.Mutex + directories map[string]bool files map[string][]byte // normally, ids are not sequential, but we make them sequential for deterministic diff @@ -31,8 +34,18 @@ type FakeWorkspace struct { Schemas map[string]catalog.SchemaInfo } +func (w *FakeWorkspace) LockUnlock() func() { + if w == nil { + panic("LockUnlock called on nil FakeWorkspace") + } + w.mu.Lock() + return func() { w.mu.Unlock() } +} + // Generic functions to handle map operations -func MapGet[T any](collection map[string]T, key string) Response { +func MapGet[T any](w *FakeWorkspace, collection map[string]T, key string) Response { + defer w.LockUnlock()() + value, ok := collection[key] if !ok { return Response{ @@ -44,7 +57,9 @@ func MapGet[T any](collection map[string]T, key string) Response { } } -func MapDelete[T any](collection map[string]T, key string) Response { +func MapDelete[T any](w *FakeWorkspace, collection map[string]T, key string) Response { + defer w.LockUnlock()() + _, ok := collection[key] if !ok { return Response{ @@ -72,6 +87,8 @@ func NewFakeWorkspace() *FakeWorkspace { } func (s *FakeWorkspace) WorkspaceGetStatus(path string) Response { + defer s.LockUnlock()() + if s.directories[path] { return Response{ Body: &workspace.ObjectInfo{ @@ -96,14 +113,17 @@ func (s *FakeWorkspace) WorkspaceGetStatus(path string) Response { } func (s *FakeWorkspace) WorkspaceMkdirs(request workspace.Mkdirs) { + defer s.LockUnlock()() s.directories[request.Path] = true } func (s *FakeWorkspace) WorkspaceExport(path string) []byte { + defer s.LockUnlock()() return s.files[path] } func (s *FakeWorkspace) WorkspaceDelete(path string, recursive bool) { + defer s.LockUnlock()() if !recursive { s.files[path] = nil } else { @@ -116,6 +136,8 @@ func (s *FakeWorkspace) WorkspaceDelete(path string, recursive bool) { } func (s *FakeWorkspace) WorkspaceFilesImportFile(filePath string, body []byte) { + defer s.LockUnlock()() + if !strings.HasPrefix(filePath, "/") { filePath = "/" + filePath } @@ -135,6 +157,8 @@ func (s *FakeWorkspace) WorkspaceFilesExportFile(path string) []byte { } func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response { + defer s.LockUnlock()() + jobId := s.nextJobId s.nextJobId++ @@ -158,6 +182,8 @@ func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response { } func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response { + defer s.LockUnlock()() + jobId := request.JobId _, ok := s.jobs[request.JobId] @@ -179,6 +205,8 @@ func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response { } func (s *FakeWorkspace) PipelinesCreate(r pipelines.PipelineSpec) Response { + defer s.LockUnlock()() + pipelineId := uuid.New().String() s.Pipelines[pipelineId] = r @@ -201,6 +229,8 @@ func (s *FakeWorkspace) JobsGet(jobId string) Response { } } + defer s.LockUnlock()() + job, ok := s.jobs[jobIdInt] if !ok { return Response{ @@ -214,6 +244,8 @@ func (s *FakeWorkspace) JobsGet(jobId string) Response { } func (s *FakeWorkspace) PipelinesGet(pipelineId string) Response { + defer s.LockUnlock()() + spec, ok := s.Pipelines[pipelineId] if !ok { return Response{ @@ -230,6 +262,8 @@ func (s *FakeWorkspace) PipelinesGet(pipelineId string) Response { } func (s *FakeWorkspace) JobsList() Response { + defer s.LockUnlock()() + list := make([]jobs.BaseJob, 0, len(s.jobs)) for _, job := range s.jobs { baseJob := jobs.BaseJob{} diff --git a/libs/testserver/server.go b/libs/testserver/server.go index feb907968f..85591416c8 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -24,7 +24,7 @@ type Server struct { t testutil.TestingT fakeWorkspaces map[string]*FakeWorkspace - mu *sync.Mutex + mu sync.Mutex RequestCallback func(request *Request) ResponseCallback func(request *Request, response *EncodedResponse) @@ -186,7 +186,6 @@ func New(t testutil.TestingT) *Server { Server: server, Router: router, t: t, - mu: &sync.Mutex{}, fakeWorkspaces: map[string]*FakeWorkspace{}, } @@ -232,26 +231,32 @@ Response.Body = '' return s } +func (s *Server) getWorkspaceForToken(token string) *FakeWorkspace { + if token == "" { + return nil + } + + defer s.LockUnlock()() + + if _, ok := s.fakeWorkspaces[token]; !ok { + s.fakeWorkspaces[token] = NewFakeWorkspace() + } + + return s.fakeWorkspaces[token] +} + +func (s *Server) LockUnlock() func() { + s.mu.Lock() + return func() { s.mu.Unlock() } +} + type HandlerFunc func(req Request) any func (s *Server) Handle(method, path string, handler HandlerFunc) { s.Router.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { - // For simplicity we process requests sequentially. It's fast enough because - // we don't do any IO except reading and writing request/response bodies. - s.mu.Lock() - defer s.mu.Unlock() - // Each test uses unique DATABRICKS_TOKEN, we simulate each token having // it's own fake fakeWorkspace to avoid interference between tests. - var fakeWorkspace *FakeWorkspace = nil - token := getToken(r) - if token != "" { - if _, ok := s.fakeWorkspaces[token]; !ok { - s.fakeWorkspaces[token] = NewFakeWorkspace() - } - - fakeWorkspace = s.fakeWorkspaces[token] - } + fakeWorkspace := s.getWorkspaceForToken(getToken(r)) request := NewRequest(s.t, r, fakeWorkspace) From 779605823b6d87143d97ac5d20694f158b14e1e5 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 7 May 2025 11:28:25 +0200 Subject: [PATCH 2/5] add missing lock --- libs/testserver/fake_workspace.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 9ede9d7fd1..d83a72a8e9 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -136,11 +136,12 @@ func (s *FakeWorkspace) WorkspaceDelete(path string, recursive bool) { } func (s *FakeWorkspace) WorkspaceFilesImportFile(filePath string, body []byte) { - defer s.LockUnlock()() - if !strings.HasPrefix(filePath, "/") { filePath = "/" + filePath } + + defer s.LockUnlock()() + s.files[filePath] = body // Add all directories in the path to the directories map @@ -153,6 +154,9 @@ func (s *FakeWorkspace) WorkspaceFilesExportFile(path string) []byte { if !strings.HasPrefix(path, "/") { path = "/" + path } + + defer s.LockUnlock()() + return s.files[path] } From f89f007ca2cdbb8e94f40c8482a68b203dbfd772 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 7 May 2025 11:35:38 +0200 Subject: [PATCH 3/5] use dedicated mutex for output recording --- acceptance/acceptance_test.go | 6 +++++- acceptance/internal/prepare_server.go | 12 ++++++++---- libs/testserver/server.go | 8 ++------ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/acceptance/acceptance_test.go b/acceptance/acceptance_test.go index 8571b12717..3ce129a7a6 100644 --- a/acceptance/acceptance_test.go +++ b/acceptance/acceptance_test.go @@ -18,6 +18,7 @@ import ( "slices" "sort" "strings" + "sync" "testing" "time" "unicode/utf8" @@ -400,7 +401,10 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont args := []string{"bash", "-euo", "pipefail", EntryPointScript} cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir) + // This mutex is used to synchronoize recording requests + var serverMutex sync.Mutex + + cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, &serverMutex) testdiff.PrepareReplacementsUser(t, &repls, user) testdiff.PrepareReplacementsWorkspaceConfig(t, &repls, cfg) diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index f92eae457b..aed73678b5 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -9,6 +9,7 @@ import ( "path/filepath" "slices" "strings" + "sync" "testing" "time" "unicode/utf8" @@ -39,7 +40,7 @@ func isTruePtr(value *bool) bool { return value != nil && *value } -func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string) (*sdkconfig.Config, iam.User) { +func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string, mu *sync.Mutex) (*sdkconfig.Config, iam.User) { cloudEnv := os.Getenv("CLOUD_ENV") // If we are running on a cloud environment, use the host configured in the @@ -68,7 +69,7 @@ func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, o }, TestUser } - host := startDedicatedServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir) + host := startDedicatedServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir, mu) return &sdkconfig.Config{ Host: host, @@ -82,6 +83,7 @@ func startDedicatedServer(t *testing.T, logRequests bool, includeHeaders []string, outputDir string, + mu *sync.Mutex, ) string { s := testserver.New(t) @@ -91,7 +93,8 @@ func startDedicatedServer(t *testing.T, req := getLoggedRequest(request, includeHeaders) reqJson, err := json.MarshalIndent(req, "", " ") - defer s.LockUnlock()() + mu.Lock() + defer mu.Unlock() assert.NoErrorf(t, err, "Failed to json-encode: %#v", req) @@ -106,7 +109,8 @@ func startDedicatedServer(t *testing.T, if logRequests { s.ResponseCallback = func(request *testserver.Request, response *testserver.EncodedResponse) { - defer s.LockUnlock()() + mu.Lock() + defer mu.Unlock() t.Logf("%d %s %s\n%s\n%s", response.StatusCode, request.Method, request.URL, diff --git a/libs/testserver/server.go b/libs/testserver/server.go index 85591416c8..f118e59911 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -236,7 +236,8 @@ func (s *Server) getWorkspaceForToken(token string) *FakeWorkspace { return nil } - defer s.LockUnlock()() + s.mu.Lock() + defer s.mu.Unlock() if _, ok := s.fakeWorkspaces[token]; !ok { s.fakeWorkspaces[token] = NewFakeWorkspace() @@ -245,11 +246,6 @@ func (s *Server) getWorkspaceForToken(token string) *FakeWorkspace { return s.fakeWorkspaces[token] } -func (s *Server) LockUnlock() func() { - s.mu.Lock() - return func() { s.mu.Unlock() } -} - type HandlerFunc func(req Request) any func (s *Server) Handle(method, path string, handler HandlerFunc) { From 40129e805ffebdd8cd3e8b4c80a94e6a970841a6 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 7 May 2025 11:54:42 +0200 Subject: [PATCH 4/5] fix typo --- acceptance/acceptance_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acceptance/acceptance_test.go b/acceptance/acceptance_test.go index 3ce129a7a6..e1a359e7ee 100644 --- a/acceptance/acceptance_test.go +++ b/acceptance/acceptance_test.go @@ -401,7 +401,7 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont args := []string{"bash", "-euo", "pipefail", EntryPointScript} cmd := exec.CommandContext(ctx, args[0], args[1:]...) - // This mutex is used to synchronoize recording requests + // This mutex is used to synchronize recording requests var serverMutex sync.Mutex cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, &serverMutex) From 6b8f38a2b5243e492ac23cfab1a147bbc785ede1 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 7 May 2025 12:10:27 +0200 Subject: [PATCH 5/5] rm unnecessary lock, Logf is threadsafe --- acceptance/internal/prepare_server.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index aed73678b5..f9cd03216c 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -109,9 +109,6 @@ func startDedicatedServer(t *testing.T, if logRequests { s.ResponseCallback = func(request *testserver.Request, response *testserver.EncodedResponse) { - mu.Lock() - defer mu.Unlock() - t.Logf("%d %s %s\n%s\n%s", response.StatusCode, request.Method, request.URL, formatHeadersAndBody("> ", request.Headers, request.Body),