Skip to content

Commit

Permalink
Implement DBFS filer (#139)
Browse files Browse the repository at this point in the history
Adds a DBFS implementation of the `filer.Filer` interface.

The integration tests are reused between the workspace filesystem and
DBFS implementations to ensure identical behavior.
  • Loading branch information
pietern committed May 31, 2023
1 parent 92cb520 commit 27df4e7
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 63 deletions.
173 changes: 114 additions & 59 deletions internal/filer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/files"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -28,74 +30,25 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str
return
}

body, err := io.ReadAll(reader)
var body bytes.Buffer
_, err = io.Copy(&body, reader)
if !assert.NoError(f, err) {
return
}

assert.Equal(f, contents, string(body))
assert.Equal(f, contents, body.String())
}

func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background()
me, err := w.CurrentUser.Me(ctx)
require.NoError(t, err)

path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("wsfs-files-"))

// Ensure directory exists, but doesn't exist YET!
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
t.Logf("mkdir %s", path)
err = w.Workspace.MkdirsByPath(ctx, path)
require.NoError(t, err)

// Remove test directory on test completion.
t.Cleanup(func() {
t.Logf("rm -rf %s", path)
err := w.Workspace.Delete(ctx, workspace.Delete{
Path: path,
Recursive: true,
})
if err == nil || apierr.IsMissing(err) {
return
}
t.Logf("unable to remove temporary workspace path %s: %#v", path, err)
})

return path
}

func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)

// Check if we can use this API here, skip test if we cannot.
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled")
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
}

return ctx, f
}

func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
var err error

ctx, f := setupWorkspaceFilesTest(t)

// Write should fail because the root path doesn't yet exist.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))

// Read should fail because the root path doesn't yet exist.
_, err = f.Read(ctx, "/foo/bar")
assert.True(t, apierr.IsMissing(err))
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))

// Write with CreateParentDirectories flag should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
Expand All @@ -113,18 +66,16 @@ func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {

// Delete should fail if the file doesn't exist.
err = f.Delete(ctx, "/doesnt_exist")
assert.True(t, apierr.IsMissing(err))
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))

// Delete should succeed for file that does exist.
err = f.Delete(ctx, "/foo/bar")
assert.NoError(t, err)
}

func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
var err error

ctx, f := setupWorkspaceFilesTest(t)

// We start with an empty directory.
entries, err := f.ReadDir(ctx, ".")
require.NoError(t, err)
Expand All @@ -148,7 +99,7 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {

// Expect an error if the path doesn't exist.
_, err = f.ReadDir(ctx, "/dir/a/b/c/d/e")
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}), err)

// Expect two entries in the root.
entries, err = f.ReadDir(ctx, ".")
Expand All @@ -172,3 +123,107 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
assert.Len(t, entries, 1)
assert.Equal(t, "c", entries[0].Name)
}

func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background()
me, err := w.CurrentUser.Me(ctx)
require.NoError(t, err)

path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-"))

// Ensure directory exists, but doesn't exist YET!
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
t.Logf("mkdir %s", path)
err = w.Workspace.MkdirsByPath(ctx, path)
require.NoError(t, err)

// Remove test directory on test completion.
t.Cleanup(func() {
t.Logf("rm -rf %s", path)
err := w.Workspace.Delete(ctx, workspace.Delete{
Path: path,
Recursive: true,
})
if err == nil || apierr.IsMissing(err) {
return
}
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
})

return path
}

func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)

// Check if we can use this API here, skip test if we cannot.
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled")
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
}

return ctx, f
}

func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
runFilerReadWriteTest(t, ctx, f)
}

func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
runFilerReadDirTest(t, ctx, f)
}

func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background()
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-"))

// This call fails if the path already exists.
t.Logf("mkdir dbfs:%s", path)
err := w.Dbfs.MkdirsByPath(ctx, path)
require.NoError(t, err)

// Remove test directory on test completion.
t.Cleanup(func() {
t.Logf("rm -rf dbfs:%s", path)
err := w.Dbfs.Delete(ctx, files.Delete{
Path: path,
Recursive: true,
})
if err == nil || apierr.IsMissing(err) {
return
}
t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err)
})

return path
}

func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryDbfsDir(t, w)
f, err := filer.NewDbfsClient(w, tmpdir)
require.NoError(t, err)
return ctx, f
}

func TestAccFilerDbfsReadWrite(t *testing.T) {
ctx, f := setupFilerDbfsTest(t)
runFilerReadWriteTest(t, ctx, f)
}

func TestAccFilerDbfsReadDir(t *testing.T) {
ctx, f := setupFilerDbfsTest(t)
runFilerReadDirTest(t, ctx, f)
}
Loading

0 comments on commit 27df4e7

Please sign in to comment.