Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion acceptance/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"slices"
"sort"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -400,7 +401,10 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
args := []string{"bash", "-euo", "pipefail", EntryPointScript}
cmd := exec.CommandContext(ctx, args[0], args[1:]...)

cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir)
// This mutex is used to synchronize recording requests
var serverMutex sync.Mutex

cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, &serverMutex)
testdiff.PrepareReplacementsUser(t, &repls, user)
testdiff.PrepareReplacementsWorkspaceConfig(t, &repls, cfg)

Expand Down
12 changes: 6 additions & 6 deletions acceptance/internal/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func addDefaultHandlers(server *testserver.Server) {
// Quality monitors:

server.Handle("GET", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any {
return testserver.MapGet(req.Workspace.Monitors, req.Vars["table_name"])
return testserver.MapGet(req.Workspace, req.Workspace.Monitors, req.Vars["table_name"])
})

server.Handle("POST", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any {
Expand All @@ -232,13 +232,13 @@ func addDefaultHandlers(server *testserver.Server) {
})

server.Handle("DELETE", "/api/2.1/unity-catalog/tables/{table_name}/monitor", func(req testserver.Request) any {
return testserver.MapDelete(req.Workspace.Monitors, req.Vars["table_name"])
return testserver.MapDelete(req.Workspace, req.Workspace.Monitors, req.Vars["table_name"])
})

// Apps:

server.Handle("GET", "/api/2.0/apps/{name}", func(req testserver.Request) any {
return testserver.MapGet(req.Workspace.Apps, req.Vars["name"])
return testserver.MapGet(req.Workspace, req.Workspace.Apps, req.Vars["name"])
})

server.Handle("POST", "/api/2.0/apps", func(req testserver.Request) any {
Expand All @@ -250,13 +250,13 @@ func addDefaultHandlers(server *testserver.Server) {
})

server.Handle("DELETE", "/api/2.0/apps/{name}", func(req testserver.Request) any {
return testserver.MapDelete(req.Workspace.Apps, req.Vars["name"])
return testserver.MapDelete(req.Workspace, req.Workspace.Apps, req.Vars["name"])
})

// Schemas:

server.Handle("GET", "/api/2.1/unity-catalog/schemas/{full_name}", func(req testserver.Request) any {
return testserver.MapGet(req.Workspace.Schemas, req.Vars["full_name"])
return testserver.MapGet(req.Workspace, req.Workspace.Schemas, req.Vars["full_name"])
})

server.Handle("POST", "/api/2.1/unity-catalog/schemas", func(req testserver.Request) any {
Expand All @@ -268,6 +268,6 @@ func addDefaultHandlers(server *testserver.Server) {
})

server.Handle("DELETE", "/api/2.1/unity-catalog/schemas/{full_name}", func(req testserver.Request) any {
return testserver.MapDelete(req.Workspace.Schemas, req.Vars["full_name"])
return testserver.MapDelete(req.Workspace, req.Workspace.Schemas, req.Vars["full_name"])
})
}
10 changes: 8 additions & 2 deletions acceptance/internal/prepare_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -39,7 +40,7 @@ func isTruePtr(value *bool) bool {
return value != nil && *value
}

func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string) (*sdkconfig.Config, iam.User) {
func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string, mu *sync.Mutex) (*sdkconfig.Config, iam.User) {
cloudEnv := os.Getenv("CLOUD_ENV")

// If we are running on a cloud environment, use the host configured in the
Expand Down Expand Up @@ -68,7 +69,7 @@ func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, o
}, TestUser
}

host := startDedicatedServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir)
host := startDedicatedServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir, mu)

return &sdkconfig.Config{
Host: host,
Expand All @@ -82,6 +83,7 @@ func startDedicatedServer(t *testing.T,
logRequests bool,
includeHeaders []string,
outputDir string,
mu *sync.Mutex,
) string {
s := testserver.New(t)

Expand All @@ -90,6 +92,10 @@ func startDedicatedServer(t *testing.T,
s.RequestCallback = func(request *testserver.Request) {
req := getLoggedRequest(request, includeHeaders)
reqJson, err := json.MarshalIndent(req, "", " ")

mu.Lock()
defer mu.Unlock()

assert.NoErrorf(t, err, "Failed to json-encode: %#v", req)

f, err := os.OpenFile(requestsPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
Expand Down
42 changes: 40 additions & 2 deletions libs/testserver/fake_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/databricks/databricks-sdk-go/service/apps"
"github.com/databricks/databricks-sdk-go/service/catalog"
Expand All @@ -19,6 +20,8 @@ import (

// FakeWorkspace holds a state of a workspace for acceptance tests.
type FakeWorkspace struct {
mu sync.Mutex

directories map[string]bool
files map[string][]byte
// normally, ids are not sequential, but we make them sequential for deterministic diff
Expand All @@ -31,8 +34,18 @@ type FakeWorkspace struct {
Schemas map[string]catalog.SchemaInfo
}

func (w *FakeWorkspace) LockUnlock() func() {
if w == nil {
panic("LockUnlock called on nil FakeWorkspace")
}
w.mu.Lock()
return func() { w.mu.Unlock() }
}

// Generic functions to handle map operations
func MapGet[T any](collection map[string]T, key string) Response {
func MapGet[T any](w *FakeWorkspace, collection map[string]T, key string) Response {
defer w.LockUnlock()()

value, ok := collection[key]
if !ok {
return Response{
Expand All @@ -44,7 +57,9 @@ func MapGet[T any](collection map[string]T, key string) Response {
}
}

func MapDelete[T any](collection map[string]T, key string) Response {
func MapDelete[T any](w *FakeWorkspace, collection map[string]T, key string) Response {
defer w.LockUnlock()()

_, ok := collection[key]
if !ok {
return Response{
Expand Down Expand Up @@ -72,6 +87,8 @@ func NewFakeWorkspace() *FakeWorkspace {
}

func (s *FakeWorkspace) WorkspaceGetStatus(path string) Response {
defer s.LockUnlock()()

if s.directories[path] {
return Response{
Body: &workspace.ObjectInfo{
Expand All @@ -96,14 +113,17 @@ func (s *FakeWorkspace) WorkspaceGetStatus(path string) Response {
}

func (s *FakeWorkspace) WorkspaceMkdirs(request workspace.Mkdirs) {
defer s.LockUnlock()()
s.directories[request.Path] = true
}

func (s *FakeWorkspace) WorkspaceExport(path string) []byte {
defer s.LockUnlock()()
return s.files[path]
}

func (s *FakeWorkspace) WorkspaceDelete(path string, recursive bool) {
defer s.LockUnlock()()
if !recursive {
s.files[path] = nil
} else {
Expand All @@ -119,6 +139,9 @@ func (s *FakeWorkspace) WorkspaceFilesImportFile(filePath string, body []byte) {
if !strings.HasPrefix(filePath, "/") {
filePath = "/" + filePath
}

defer s.LockUnlock()()

s.files[filePath] = body

// Add all directories in the path to the directories map
Expand All @@ -131,10 +154,15 @@ func (s *FakeWorkspace) WorkspaceFilesExportFile(path string) []byte {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}

defer s.LockUnlock()()

return s.files[path]
}
Comment thread
denik marked this conversation as resolved.

func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response {
defer s.LockUnlock()()

jobId := s.nextJobId
s.nextJobId++

Expand All @@ -158,6 +186,8 @@ func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response {
}

func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response {
defer s.LockUnlock()()

jobId := request.JobId

_, ok := s.jobs[request.JobId]
Expand All @@ -179,6 +209,8 @@ func (s *FakeWorkspace) JobsReset(request jobs.ResetJob) Response {
}

func (s *FakeWorkspace) PipelinesCreate(r pipelines.PipelineSpec) Response {
defer s.LockUnlock()()

pipelineId := uuid.New().String()

s.Pipelines[pipelineId] = r
Expand All @@ -201,6 +233,8 @@ func (s *FakeWorkspace) JobsGet(jobId string) Response {
}
}

defer s.LockUnlock()()

job, ok := s.jobs[jobIdInt]
if !ok {
return Response{
Expand All @@ -214,6 +248,8 @@ func (s *FakeWorkspace) JobsGet(jobId string) Response {
}

func (s *FakeWorkspace) PipelinesGet(pipelineId string) Response {
defer s.LockUnlock()()

spec, ok := s.Pipelines[pipelineId]
if !ok {
return Response{
Expand All @@ -230,6 +266,8 @@ func (s *FakeWorkspace) PipelinesGet(pipelineId string) Response {
}

func (s *FakeWorkspace) JobsList() Response {
defer s.LockUnlock()()

list := make([]jobs.BaseJob, 0, len(s.jobs))
for _, job := range s.jobs {
baseJob := jobs.BaseJob{}
Expand Down
33 changes: 17 additions & 16 deletions libs/testserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Server struct {
t testutil.TestingT

fakeWorkspaces map[string]*FakeWorkspace
mu *sync.Mutex
mu sync.Mutex

RequestCallback func(request *Request)
ResponseCallback func(request *Request, response *EncodedResponse)
Expand Down Expand Up @@ -186,7 +186,6 @@ func New(t testutil.TestingT) *Server {
Server: server,
Router: router,
t: t,
mu: &sync.Mutex{},
fakeWorkspaces: map[string]*FakeWorkspace{},
}

Expand Down Expand Up @@ -232,26 +231,28 @@ Response.Body = '<response body here>'
return s
}

func (s *Server) getWorkspaceForToken(token string) *FakeWorkspace {
if token == "" {
return nil
}

s.mu.Lock()
Comment thread
shreyas-goenka marked this conversation as resolved.
defer s.mu.Unlock()

if _, ok := s.fakeWorkspaces[token]; !ok {
s.fakeWorkspaces[token] = NewFakeWorkspace()
}

return s.fakeWorkspaces[token]
}

type HandlerFunc func(req Request) any

func (s *Server) Handle(method, path string, handler HandlerFunc) {
s.Router.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
// For simplicity we process requests sequentially. It's fast enough because
// we don't do any IO except reading and writing request/response bodies.
s.mu.Lock()
defer s.mu.Unlock()

// Each test uses unique DATABRICKS_TOKEN, we simulate each token having
// it's own fake fakeWorkspace to avoid interference between tests.
var fakeWorkspace *FakeWorkspace = nil
token := getToken(r)
if token != "" {
if _, ok := s.fakeWorkspaces[token]; !ok {
s.fakeWorkspaces[token] = NewFakeWorkspace()
}

fakeWorkspace = s.fakeWorkspaces[token]
}
fakeWorkspace := s.getWorkspaceForToken(getToken(r))

request := NewRequest(s.t, r, fakeWorkspace)

Expand Down
Loading