Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for glob patterns in pipeline libraries section #833

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions bundle/config/mutator/expand_pipeline_glob_paths.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mutator

import (
"context"
"fmt"
"path/filepath"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)

type expandPipelineGlobPaths struct{}

func ExpandPipelineGlobPaths() bundle.Mutator {
return &expandPipelineGlobPaths{}
}

func (m *expandPipelineGlobPaths) Apply(_ context.Context, b *bundle.Bundle) error {
for key, pipeline := range b.Config.Resources.Pipelines {
dir, err := pipeline.ConfigFileDirectory()
if err != nil {
return fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
}

expandedLibraries := make([]pipelines.PipelineLibrary, 0)
for i := 0; i < len(pipeline.Libraries); i++ {

library := &pipeline.Libraries[i]
path := getGlobPatternToExpand(library)
if path == "" || !libraries.IsLocalPath(path) {
expandedLibraries = append(expandedLibraries, *library)
continue
}

matches, err := filepath.Glob(filepath.Join(dir, path))
if err != nil {
return err
}

for _, match := range matches {
m, err := filepath.Rel(dir, match)
if err != nil {
return err
}
expandedLibraries = append(expandedLibraries, cloneWithPath(library, m))
}
}
pipeline.Libraries = expandedLibraries
}

return nil
}

func getGlobPatternToExpand(library *pipelines.PipelineLibrary) string {
if library.File != nil {
return library.File.Path
}

if library.Notebook != nil {
return library.Notebook.Path
}

return ""
}

func cloneWithPath(library *pipelines.PipelineLibrary, path string) pipelines.PipelineLibrary {
if library.File != nil {
return pipelines.PipelineLibrary{
File: &pipelines.FileLibrary{
Path: path,
},
}
}

if library.Notebook != nil {
return pipelines.PipelineLibrary{
Notebook: &pipelines.NotebookLibrary{
Path: path,
},
}
}

return pipelines.PipelineLibrary{}
}

func (*expandPipelineGlobPaths) Name() string {
return "ExpandPipelineGlobPaths"
}
154 changes: 154 additions & 0 deletions bundle/config/mutator/expand_pipeline_glob_paths_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package mutator

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/require"
)

func touchEmptyFile(t *testing.T, path string) {
err := os.MkdirAll(filepath.Dir(path), 0700)
require.NoError(t, err)
f, err := os.Create(path)
require.NoError(t, err)
f.Close()
}

func TestExpandGlobPathsInPipelines(t *testing.T) {
dir := t.TempDir()

touchEmptyFile(t, filepath.Join(dir, "test1.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test1.jar"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.jar"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.jar"))
touchEmptyFile(t, filepath.Join(dir, "test1.py"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.py"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.py"))

b := &bundle.Bundle{
Config: config.Root{
Path: dir,
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: paths.Paths{
ConfigFilePath: filepath.Join(dir, "resource.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "./**/*.ipynb",
},
},
{
Jar: "./*.jar",
},
{
File: &pipelines.FileLibrary{
Path: "./**/*.py",
},
},
{
Maven: &compute.MavenLibrary{
Coordinates: "org.jsoup:jsoup:1.7.2",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "./test1.ipynb",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Workspace/Users/me@company.com/test.ipynb",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "dbfs:/me@company.com/test.ipynb",
},
},
},
},
},
},
},
},
}

m := ExpandPipelineGlobPaths()
err := bundle.Apply(context.Background(), b, m)
require.NoError(t, err)

libraries := b.Config.Resources.Pipelines["pipeline"].Libraries
require.Len(t, libraries, 9)

// Making sure glob patterns are expanded correctly
require.True(t, containsNotebook(libraries, filepath.Join("test", "test2.ipynb")))
require.True(t, containsNotebook(libraries, filepath.Join("test", "test3.ipynb")))
require.True(t, containsFile(libraries, filepath.Join("test", "test2.py")))
require.True(t, containsFile(libraries, filepath.Join("test", "test3.py")))

// Making sure exact file references work as well
require.True(t, containsNotebook(libraries, "test1.ipynb"))

// Making sure absolute pass to remote FS file references work as well
require.True(t, containsNotebook(libraries, "/Workspace/Users/me@company.com/test.ipynb"))
require.True(t, containsNotebook(libraries, "dbfs:/me@company.com/test.ipynb"))

// Making sure other libraries are not replaced
require.True(t, containsJar(libraries, "./*.jar"))
require.True(t, containsMaven(libraries, "org.jsoup:jsoup:1.7.2"))
}

func containsNotebook(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.Notebook != nil && l.Notebook.Path == path {
return true
}
}

return false
}

func containsJar(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.Jar == path {
return true
}
}

return false
}

func containsMaven(libraries []pipelines.PipelineLibrary, coordinates string) bool {
for _, l := range libraries {
if l.Maven != nil && l.Maven.Coordinates == coordinates {
return true
}
}

return false
}

func containsFile(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.File != nil && l.File.Path == path {
return true
}
}

return false
}
4 changes: 4 additions & 0 deletions bundle/libraries/libraries.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func isLocalLibrary(library *compute.Library) bool {
return false
}

return IsLocalPath(path)
}

func IsLocalPath(path string) bool {
if isExplicitFileScheme(path) {
return true
}
Expand Down
1 change: 1 addition & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Initialize() bundle.Mutator {
),
mutator.OverrideCompute(),
mutator.ProcessTargetMode(),
mutator.ExpandPipelineGlobPaths(),
mutator.TranslatePaths(),
python.WrapperWarning(),
terraform.Initialize(),
Expand Down