Skip to content

Commit

Permalink
Merge pull request #106 from storey247/ds/fixmountdirectoryerror
Browse files Browse the repository at this point in the history
Fix issue where file system mount to invalid directory caused plan to fail
  • Loading branch information
stikkireddy committed Jun 16, 2020
2 parents 8b42563 + cf84787 commit 1ca9c22
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 50 deletions.
5 changes: 5 additions & 0 deletions client/service/commands.go
Expand Up @@ -16,6 +16,11 @@ type CommandsAPI struct {
Client *DBApiClient
}

// CommandExecutor creates a spark context and executes a command and then closes context
type CommandExecutor interface {
Execute(clusterID, language, commandStr string) (model.Command, error)
}

// Execute creates a spark context and executes a command and then closes context
func (a CommandsAPI) Execute(clusterID, language, commandStr string) (model.Command, error) {
var resp model.Command
Expand Down
3 changes: 3 additions & 0 deletions client/service/commands_test.go
Expand Up @@ -7,6 +7,9 @@ import (
"github.com/databrickslabs/databricks-terraform/client/model"
)

// Test interface compliance
var _ CommandExecutor = (*CommandsAPI)(nil)

func TestCommandsAPI_Execute(t *testing.T) {
type context struct {
Language string `json:"language,omitempty"`
Expand Down
69 changes: 33 additions & 36 deletions databricks/mounts.go
Expand Up @@ -30,13 +30,13 @@ func NewAWSIamMount(s3BucketName string, mountName string) *AWSIamMount {
}

// Create creates an aws iam mount given a cluster ID
func (m AWSIamMount) Create(client *service.DBApiClient, clusterID string) error {
func (m AWSIamMount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.mount("s3a://%s", "/mnt/%s")
dbutils.fs.ls("/mnt/%s")
dbutils.notebook.exit("success")
`, m.S3BucketName, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -48,13 +48,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes an aws iam mount given a cluster ID
func (m AWSIamMount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AWSIamMount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -66,14 +66,14 @@ dbutils.notebook.exit("success")
}

// Read verifies an aws iam mount given a cluster ID
func (m AWSIamMount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AWSIamMount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func NewAzureBlobMount(containerName string, storageAccountName string, director
}

// Create creates a azure blob storage mount given a cluster id
func (m AzureBlobMount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureBlobMount) Create(exec service.CommandExecutor, clusterID string) error {
var confKey string

if m.AuthType == "SAS" {
Expand All @@ -133,7 +133,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName, confKey, m.SecretScope, m.SecretKey)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -145,13 +145,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure blob storage mount given a cluster id
func (m AzureBlobMount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureBlobMount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -163,14 +163,13 @@ dbutils.notebook.exit("success")
}

// Read verifies a azure blob storage mount given a cluster id
func (m AzureBlobMount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureBlobMount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -214,7 +213,7 @@ func NewAzureADLSGen1Mount(storageResource string, directory string, mountName s
}

// Create creates a azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen1Mount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[8]s" and mount.source=="adl://%[6]s.azuredatalakestore.net%[7]s":
Expand All @@ -235,7 +234,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.PrefixType, m.ClientID, m.SecretScope, m.SecretKey, m.TenantID, m.StorageResource, m.Directory, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -247,13 +246,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen1Mount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -265,14 +264,13 @@ dbutils.notebook.exit("success")
}

// Read verifies the azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureADLSGen1Mount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -316,7 +314,7 @@ func NewAzureADLSGen2Mount(containerName string, storageAccountName string, dire
}

// Create creates a azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen2Mount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[9]s" and mount.source=="abfss://%[6]s@%[7]s.dfs.core.windows.net%[8]s":
Expand All @@ -342,7 +340,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.ClientID, m.SecretScope, m.SecretKey, m.TenantID, m.InitializeFileSystem, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -354,13 +352,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen2Mount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -372,14 +370,13 @@ dbutils.notebook.exit("success")
}

// Read verifies the azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureADLSGen2Mount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down

0 comments on commit 1ca9c22

Please sign in to comment.