Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where file system mount to invalid directory caused plan to fail #106

Merged
merged 5 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/service/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ type CommandsAPI struct {
Client *DBApiClient
}

// CommandExecutor creates a spark context and executes a command and then closes context
type CommandExecutor interface {
Execute(clusterID, language, commandStr string) (model.Command, error)
stikkireddy marked this conversation as resolved.
Show resolved Hide resolved
}

// Execute creates a spark context and executes a command and then closes context
func (a CommandsAPI) Execute(clusterID, language, commandStr string) (model.Command, error) {
var resp model.Command
Expand Down
69 changes: 33 additions & 36 deletions databricks/mounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func NewAWSIamMount(s3BucketName string, mountName string) *AWSIamMount {
}

// Create creates an aws iam mount given a cluster ID
func (m AWSIamMount) Create(client *service.DBApiClient, clusterID string) error {
func (m AWSIamMount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.mount("s3a://%s", "/mnt/%s")
dbutils.fs.ls("/mnt/%s")
dbutils.notebook.exit("success")
`, m.S3BucketName, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -48,13 +48,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes an aws iam mount given a cluster ID
func (m AWSIamMount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AWSIamMount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -66,14 +66,14 @@ dbutils.notebook.exit("success")
}

// Read verifies an aws iam mount given a cluster ID
func (m AWSIamMount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AWSIamMount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func NewAzureBlobMount(containerName string, storageAccountName string, director
}

// Create creates a azure blob storage mount given a cluster id
func (m AzureBlobMount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureBlobMount) Create(exec service.CommandExecutor, clusterID string) error {
var confKey string

if m.AuthType == "SAS" {
Expand All @@ -133,7 +133,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName, confKey, m.SecretScope, m.SecretKey)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -145,13 +145,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure blob storage mount given a cluster id
func (m AzureBlobMount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureBlobMount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -163,14 +163,13 @@ dbutils.notebook.exit("success")
}

// Read verifies a azure blob storage mount given a cluster id
func (m AzureBlobMount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureBlobMount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -214,7 +213,7 @@ func NewAzureADLSGen1Mount(storageResource string, directory string, mountName s
}

// Create creates a azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen1Mount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[8]s" and mount.source=="adl://%[6]s.azuredatalakestore.net%[7]s":
Expand All @@ -235,7 +234,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.PrefixType, m.ClientID, m.SecretScope, m.SecretKey, m.TenantID, m.StorageResource, m.Directory, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -247,13 +246,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen1Mount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -265,14 +264,13 @@ dbutils.notebook.exit("success")
}

// Read verifies the azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureADLSGen1Mount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -316,7 +314,7 @@ func NewAzureADLSGen2Mount(containerName string, storageAccountName string, dire
}

// Create creates a azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Create(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen2Mount) Create(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[9]s" and mount.source=="abfss://%[6]s@%[7]s.dfs.core.windows.net%[8]s":
Expand All @@ -342,7 +340,7 @@ except Exception as e:
raise e
dbutils.notebook.exit("success")
`, m.ClientID, m.SecretScope, m.SecretKey, m.TenantID, m.InitializeFileSystem, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -354,13 +352,13 @@ dbutils.notebook.exit("success")
}

// Delete deletes a azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Delete(client *service.DBApiClient, clusterID string) error {
func (m AzureADLSGen2Mount) Delete(exec service.CommandExecutor, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.refreshMounts()
dbutils.notebook.exit("success")
`, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
}
Expand All @@ -372,14 +370,13 @@ dbutils.notebook.exit("success")
}

// Read verifies the azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Read(client *service.DBApiClient, clusterID string) (string, error) {
func (m AzureADLSGen2Mount) Read(exec service.CommandExecutor, clusterID string) (string, error) {
iamMountCommand := fmt.Sprintf(`
dbutils.fs.ls("/mnt/%s")
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if mount.mountPoint == "/mnt/%s":
dbutils.notebook.exit(mount.source)
`, m.MountName)
resp, err := exec.Execute(clusterID, "python", iamMountCommand)
if err != nil {
return "", err
}
Expand Down