Skip to content

Commit

Permalink
Merge pull request #87 from storey247/ds/fix-mountclustermissing
Browse files Browse the repository at this point in the history
mounts now handle deleted cluster correctly
  • Loading branch information
stikkireddy committed Jun 11, 2020
2 parents 6137b83 + 164ea0b commit b8b4d86
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 40 deletions.
42 changes: 28 additions & 14 deletions databricks/mounts.go
Expand Up @@ -117,16 +117,21 @@ func (m AzureBlobMount) Create(client *service.DBApiClient, clusterID string) er
confKey = fmt.Sprintf("fs.azure.account.key.%s.blob.core.windows.net", m.StorageAccountName)
}
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[4]s" and mount.source=="wasbs://%[1]s@%[2]s.blob.core.windows.net%[3]s":
print ("Mount already exists")
dbutils.notebook.exit("success")
try:
dbutils.fs.mount(
source = "wasbs://%s@%s.blob.core.windows.net%s",
mount_point = "/mnt/%s",
extra_configs = {"%s":dbutils.secrets.get(scope = "%s", key = "%s")})
source = "wasbs://%[1]s@%[2]s.blob.core.windows.net%[3]s",
mount_point = "/mnt/%[4]s",
extra_configs = {"%[5]s":dbutils.secrets.get(scope = "%[6]s", key = "%[7]s")})
except Exception as e:
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.unmount("/mnt/%[4]s")
raise e
dbutils.notebook.exit("success")
`, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName, confKey, m.SecretScope, m.SecretKey, m.MountName)
`, m.ContainerName, m.StorageAccountName, m.Directory, m.MountName, confKey, m.SecretScope, m.SecretKey)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
Expand Down Expand Up @@ -210,21 +215,25 @@ func NewAzureADLSGen1Mount(storageResource string, directory string, mountName s
// Create creates a azure datalake gen 1 storage mount given a cluster id
func (m AzureADLSGen1Mount) Create(client *service.DBApiClient, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[8]s" and mount.source=="adl://%[6]s.azuredatalakestore.net%[7]s":
print ("Mount already exists")
dbutils.notebook.exit("success")
try:
configs = {"%s.oauth2.access.token.provider.type": "ClientCredential",
"%s.oauth2.client.id": "%s",
"%s.oauth2.credential": dbutils.secrets.get(scope = "%s", key = "%s"),
"%s.oauth2.refresh.url": "https://login.microsoftonline.com/%s/oauth2/token"}
configs = {"%[1]s.oauth2.access.token.provider.type": "ClientCredential",
"%[1]s.oauth2.client.id": "%[2]s",
"%[1]s.oauth2.credential": dbutils.secrets.get(scope = "%[3]s", key = "%[4]s"),
"%[1]s.oauth2.refresh.url": "https://login.microsoftonline.com/%[5]s/oauth2/token"}
dbutils.fs.mount(
source = "adl://%s.azuredatalakestore.net%s",
mount_point = "/mnt/%s",
source = "adl://%[6]s.azuredatalakestore.net%[7]s",
mount_point = "/mnt/%[8]s",
extra_configs = configs)
except Exception as e:
dbutils.fs.unmount("/mnt/%s")
dbutils.fs.unmount("/mnt/%[8]s")
raise e
dbutils.notebook.exit("success")
`, m.PrefixType, m.PrefixType, m.ClientID, m.PrefixType, m.SecretScope, m.SecretKey, m.PrefixType, m.TenantID,
m.StorageResource, m.Directory, m.MountName, m.MountName)
`, m.PrefixType, m.ClientID, m.SecretScope, m.SecretKey, m.TenantID, m.StorageResource, m.Directory, m.MountName, m.MountName)
resp, err := client.Commands().Execute(clusterID, "python", iamMountCommand)
if err != nil {
return err
Expand Down Expand Up @@ -308,6 +317,11 @@ func NewAzureADLSGen2Mount(containerName string, storageAccountName string, dire
// Create creates a azure datalake gen 2 storage mount
func (m AzureADLSGen2Mount) Create(client *service.DBApiClient, clusterID string) error {
iamMountCommand := fmt.Sprintf(`
for mount in dbutils.fs.mounts():
if mount.mountPoint == "/mnt/%[9]s" and mount.source=="abfss://%[6]s@%[7]s.dfs.core.windows.net%[8]s":
print ("Mount already exists")
dbutils.notebook.exit("success")
try:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
Expand Down
17 changes: 12 additions & 5 deletions databricks/resource_databricks_azure_adls_gen1_mount.go
Expand Up @@ -2,10 +2,12 @@ package databricks

import (
"fmt"
"log"
"strings"

"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
"strings"
)

func resourceAzureAdlsGen1Mount() *schema.Resource {
Expand Down Expand Up @@ -135,10 +137,6 @@ func resourceAzureAdlsGen1Create(d *schema.ResourceData, m interface{}) error {
func resourceAzureAdlsGen1Read(d *schema.ResourceData, m interface{}) error {
client := m.(*service.DBApiClient)
clusterID := d.Get("cluster_id").(string)
err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
return err
}
storageResourceName := d.Get("storage_resource_name").(string)
sparkConfPrefix := d.Get("spark_conf_prefix").(string)
directory := d.Get("directory").(string)
Expand All @@ -147,6 +145,15 @@ func resourceAzureAdlsGen1Read(d *schema.ResourceData, m interface{}) error {
clientID := d.Get("client_id").(string)
clientSecretScope := d.Get("client_secret_scope").(string)
clientSecretKey := d.Get("client_secret_key").(string)
err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
if isClusterMissing(err.Error(), clusterID) {
log.Printf("Unable to refresh mount '%s' as cluster '%s' is missing", mountName, clusterID)
d.SetId("")
return nil
}
return err
}

adlsGen1Mount := NewAzureADLSGen1Mount(storageResourceName, directory, mountName,
sparkConfPrefix, clientID, tenantID, clientSecretScope, clientSecretKey)
Expand Down
18 changes: 13 additions & 5 deletions databricks/resource_databricks_azure_adls_gen2_mount.go
Expand Up @@ -2,9 +2,11 @@ package databricks

import (
"fmt"
"log"
"strings"

"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"strings"
)

func resourceAzureAdlsGen2Mount() *schema.Resource {
Expand Down Expand Up @@ -129,10 +131,6 @@ func resourceAzureAdlsGen2Create(d *schema.ResourceData, m interface{}) error {
func resourceAzureAdlsGen2Read(d *schema.ResourceData, m interface{}) error {
client := m.(*service.DBApiClient)
clusterID := d.Get("cluster_id").(string)
err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
return err
}
containerName := d.Get("container_name").(string)
storageAccountName := d.Get("storage_account_name").(string)
directory := d.Get("directory").(string)
Expand All @@ -143,6 +141,16 @@ func resourceAzureAdlsGen2Read(d *schema.ResourceData, m interface{}) error {
clientSecretKey := d.Get("client_secret_key").(string)
initializeFileSystem := d.Get("initialize_file_system").(bool)

err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
if isClusterMissing(err.Error(), clusterID) {
log.Printf("Unable to refresh mount '%s' as cluster '%s' is missing", mountName, clusterID)
d.SetId("")
return nil
}
return err
}

adlsGen2Mount := NewAzureADLSGen2Mount(containerName, storageAccountName, directory, mountName, clientID, tenantID,
clientSecretScope, clientSecretKey, initializeFileSystem)

Expand Down
47 changes: 47 additions & 0 deletions databricks/resource_databricks_azure_adls_gen2_mount_test.go
Expand Up @@ -6,7 +6,11 @@ import (
"regexp"
"testing"

"github.com/databrickslabs/databricks-terraform/client/model"
"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/terraform"
"github.com/stretchr/testify/assert"
)

func TestAccAzureAdlsGen2Mount_correctly_mounts(t *testing.T) {
Expand All @@ -22,6 +26,29 @@ func TestAccAzureAdlsGen2Mount_correctly_mounts(t *testing.T) {
})
}

func TestAccAzureAdlsGen2Mount_cluster_deleted_correctly_mounts(t *testing.T) {
terraformToApply := testAccAzureAdlsGen2Mount_correctly_mounts()
var cluster model.ClusterInfo

resource.Test(t, resource.TestCase{
Providers: testAccProviders,
Steps: []resource.TestStep{
{
Config: terraformToApply,
Check: testClusterResourceExists("databricks_cluster.cluster", &cluster, t),
},
{
PreConfig: func() {
client := testAccProvider.Meta().(*service.DBApiClient)
err := client.Clusters().Delete(cluster.ClusterID)
assert.NoError(t, err, err)
},
Config: terraformToApply,
},
},
})
}

func TestAccAzureAdlsGen2Mount_capture_error(t *testing.T) {
terraformToApply := testAccAzureAdlsGen2Mount_capture_error()

Expand Down Expand Up @@ -165,3 +192,23 @@ func testAccAzureAdlsGen2Mount_capture_error() string {
`, clientID, clientSecret, tenantID, subscriptionID, workspaceName, resourceGroupName, managedResourceGroupName, location, gen2AdalName)
return definition
}

// testClusterResourceExists queries the API and retrieves the matching Cluster.
func testClusterResourceExists(n string, cluster *model.ClusterInfo, t *testing.T) resource.TestCheckFunc {
return func(s *terraform.State) error {
// find the corresponding state object
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

conn := testAccProvider.Meta().(*service.DBApiClient)
resp, err := conn.Clusters().Get(rs.Primary.ID)
if err != nil {
return err
}

*cluster = resp
return nil
}
}
18 changes: 13 additions & 5 deletions databricks/resource_databricks_azure_blob_mount.go
Expand Up @@ -2,10 +2,12 @@ package databricks

import (
"fmt"
"log"
"strings"

"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
"strings"
)

func resourceAzureBlobMount() *schema.Resource {
Expand Down Expand Up @@ -121,10 +123,6 @@ func resourceAzureBlobMountCreate(d *schema.ResourceData, m interface{}) error {
func resourceAzureBlobMountRead(d *schema.ResourceData, m interface{}) error {
client := m.(*service.DBApiClient)
clusterID := d.Get("cluster_id").(string)
err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
return err
}
containerName := d.Get("container_name").(string)
storageAccountName := d.Get("storage_account_name").(string)
directory := d.Get("directory").(string)
Expand All @@ -133,6 +131,16 @@ func resourceAzureBlobMountRead(d *schema.ResourceData, m interface{}) error {
tokenSecretScope := d.Get("token_secret_scope").(string)
tokenSecretKey := d.Get("token_secret_key").(string)

err := changeClusterIntoRunningState(clusterID, client)
if err != nil {
if isClusterMissing(err.Error(), clusterID) {
log.Printf("Unable to refresh mount '%s' as cluster '%s' is missing", mountName, clusterID)
d.SetId("")
return nil
}
return err
}

blobMount := NewAzureBlobMount(containerName, storageAccountName, directory, mountName, authType,
tokenSecretScope, tokenSecretKey)

Expand Down
30 changes: 30 additions & 0 deletions databricks/resource_databricks_azure_blob_mount_test.go
Expand Up @@ -42,6 +42,36 @@ func TestAccAzureBlobMount_correctly_mounts(t *testing.T) {
})
}

func TestAccAzureBlobMount_cluster_deleted_correctly_mounts(t *testing.T) {
terraformToApply := testAccAzureBlobMount_correctly_mounts()
var clusterInfo model.ClusterInfo
var azureBlobMount AzureBlobMount

resource.Test(t, resource.TestCase{
Providers: testAccProviders,
Steps: []resource.TestStep{
{
Config: terraformToApply,
Check: resource.ComposeTestCheckFunc(
testAccAzureBlobMount_cluster_exists("databricks_cluster.cluster", &clusterInfo),
testAccAzureBlobMount_mount_exists("databricks_azure_blob_mount.mount", &azureBlobMount, &clusterInfo),
),
},
{
PreConfig: func() {
client := testAccProvider.Meta().(*service.DBApiClient)
err := client.Clusters().Delete(clusterInfo.ClusterID)
assert.NoError(t, err, err)
},
Config: terraformToApply,
Check: resource.ComposeTestCheckFunc(
testAccAzureBlobMount_mount_exists("databricks_azure_blob_mount.mount", &azureBlobMount, &clusterInfo),
),
},
},
})
}

func testAccAzureBlobMount_cluster_exists(n string, clusterInfo *model.ClusterInfo) resource.TestCheckFunc {
return func(s *terraform.State) error {
// find the corresponding state object
Expand Down
13 changes: 4 additions & 9 deletions databricks/resource_databricks_cluster.go
Expand Up @@ -2,13 +2,13 @@ package databricks

import (
"errors"
"fmt"
"github.com/databrickslabs/databricks-terraform/client/model"
"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"log"
"strings"
"time"

"github.com/databrickslabs/databricks-terraform/client/model"
"github.com/databrickslabs/databricks-terraform/client/service"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
)

func resourceCluster() *schema.Resource {
Expand Down Expand Up @@ -1453,8 +1453,3 @@ func getMapFromOneItemSet(input interface{}) map[string]interface{} {
}
return nil
}

func isClusterMissing(errorMsg, resourceID string) bool {
return strings.Contains(errorMsg, "INVALID_PARAMETER_VALUE") &&
strings.Contains(errorMsg, fmt.Sprintf("Cluster %s does not exist", resourceID))
}
10 changes: 8 additions & 2 deletions databricks/utils.go
Expand Up @@ -2,10 +2,11 @@ package databricks

import (
"fmt"
"github.com/databrickslabs/databricks-terraform/client/model"
"github.com/databrickslabs/databricks-terraform/client/service"
"strings"
"time"

"github.com/databrickslabs/databricks-terraform/client/model"
"github.com/databrickslabs/databricks-terraform/client/service"
)

func changeClusterIntoRunningState(clusterID string, client *service.DBApiClient) error {
Expand Down Expand Up @@ -69,6 +70,11 @@ func changeClusterIntoRunningState(clusterID string, client *service.DBApiClient

}

func isClusterMissing(errorMsg, resourceID string) bool {
return strings.Contains(errorMsg, "INVALID_PARAMETER_VALUE") &&
strings.Contains(errorMsg, fmt.Sprintf("Cluster %s does not exist", resourceID))
}

// PackagedMWSIds is a struct that contains both the MWS acct id and the ResourceId (resources are networks, creds, etc.)
type PackagedMWSIds struct {
MwsAcctId string
Expand Down
31 changes: 31 additions & 0 deletions databricks/utils_test.go
@@ -0,0 +1,31 @@
package databricks

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsClusterMissingTrueWhenClusterIdSpecifiedPresent(t *testing.T) {
errorMessage := "{\"error_code\":\"INVALID_PARAMETER_VALUE\",\"message\":\"Cluster 123 does not exist\"}"

result := isClusterMissing(errorMessage, "123")

assert.True(t, result)
}

func TestIsClusterMissingFalseWhenClusterIdSpecifiedNotPresent(t *testing.T) {
errorMessage := "{\"error_code\":\"INVALID_PARAMETER_VALUE\",\"message\":\"Cluster 123 does not exist\"}"

result := isClusterMissing(errorMessage, "xyz")

assert.False(t, result)
}

func TestIsClusterMissingFalseWhenErrorNotInCorrectFormat(t *testing.T) {
errorMessage := "{\"error_code\":\"INVALID_PARAMETER_VALUE\",\"message\":\"Something random went bang xyz\"}"

result := isClusterMissing(errorMessage, "xyz")

assert.False(t, result)
}
3 changes: 3 additions & 0 deletions website/content/Resources/azure_adls_gen1_mount.md
Expand Up @@ -13,7 +13,10 @@ principal/enterprise ad application which will provide you a client id and clien

{{% notice warning %}}
It is important to understand that this will start up the cluster if the cluster is terminated.

The read and refresh terraform command will require a cluster and make take some time to validate mount.

If the cluster associated with the mount is deleted, then the mount will be re-created by terraform on next plan. It is important to note that provided the mount path and the storage account information remains the same, the mount will not actually get re-created inside the workspace.
{{% /notice %}}

{{% notice note %}}
Expand Down

0 comments on commit b8b4d86

Please sign in to comment.