Skip to content

Commit

Permalink
Merge pull request #52 from bruin-data/feature/support-tags
Browse files Browse the repository at this point in the history
Feature/support tags
  • Loading branch information
karakanb committed Jun 4, 2024
2 parents f33de3b + 47f19e7 commit f23f921
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ You should get an output that looks like this:
Pipeline: bruin-example (.)
No issues found

✓ Successfully validated 2 tasks across 1 pipeline, all good.
✓ Successfully validated 2 assets across 1 pipeline, all good.
```

If you have defined your credentials, bruin will automatically detect them and validate all of your queries using
Expand Down
2 changes: 1 addition & 1 deletion cmd/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func reportLintErrors(result *lint.PipelineAnalysisResult, err error, printer li
taskCount += len(p.Pipeline.Assets)
}

successPrinter.Printf("\n✓ Successfully validated %d tasks across %d %s, all good.\n", taskCount, pipelineCount, pipelineStr)
successPrinter.Printf("\n✓ Successfully validated %d assets across %d %s, all good.\n", taskCount, pipelineCount, pipelineStr)
return nil
}

Expand Down
30 changes: 28 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func Run(isDebug *bool) *cli.Command {
Aliases: []string{"r"},
Usage: "truncate the table before running",
},
&cli.StringFlag{
Name: "tag",
Aliases: []string{"t"},
Usage: "pick the assets with the given tag",
},
},
Action: func(c *cli.Context) error {
defer func() {
Expand Down Expand Up @@ -170,8 +175,12 @@ func Run(isDebug *bool) *cli.Command {
}

runningForAnAsset := isPathReferencingAsset(inputPath)
var task *pipeline.Asset
if runningForAnAsset && c.String("tag") != "" {
errorPrinter.Printf("You cannot use the '--tag' flag when running a single asset.\n")
return cli.Exit("", 1)
}

var task *pipeline.Asset
runDownstreamTasks := false
if runningForAnAsset {
task, err = DefaultPipelineBuilder.CreateAssetFromFile(inputPath)
Expand Down Expand Up @@ -251,14 +260,31 @@ func Run(isDebug *bool) *cli.Command {

s := scheduler.NewScheduler(logger, foundPipeline, c.Bool("push-metadata"))

infoPrinter.Printf("\nStarting the pipeline execution...\n\n")
infoPrinter.Printf("\nStarting the pipeline execution...\n")

if task != nil {
logger.Debug("marking single task to run: ", task.Name)
s.MarkAll(scheduler.Succeeded)
s.MarkTask(task, scheduler.Pending, runDownstreamTasks)
}

tag := c.String("tag")
if tag != "" {
assetsByTag := foundPipeline.GetAssetsByTag(tag)
if len(assetsByTag) == 0 {
errorPrinter.Printf("No assets found with the tag '%s'\n", tag)
return cli.Exit("", 1)
}

logger.Debugf("marking assets with tag '%s' to run", tag)
s.MarkAll(scheduler.Succeeded)
s.MarkByTag(tag, scheduler.Pending, runDownstreamTasks)

infoPrinter.Printf("Running only the assets with tag '%s', found %d assets.\n", tag, len(assetsByTag))
}

infoPrinter.Println()

mainExecutors, err := setupExecutors(s, cm, connectionManager, startDate, endDate, runID, c.Bool("full-refresh"))
if err != nil {
errorPrinter.Printf(err.Error())
Expand Down
20 changes: 3 additions & 17 deletions examples/simple-pipeline/assets/basic.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
""" @bruin
name: basic
type: python
columns:
- name: date
checks:
- name: not_null
- name: wind
checks:
- name: not_null
tags:
- tag1
- tag2
custom_checks:
- name: check1
query:
SELECT COUNT(DISTINCT power_plant_id)
FROM earth_external.epias_power_generation
WHERE loaded_at = (
SELECT MAX(loaded_at)
FROM `earth_external.epias_power_generation`
)
value: 12
@bruin """

import json
Expand Down
42 changes: 42 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ type Asset struct {
Instance string `json:"instance"`
Owner string `json:"owner"`
Metadata EmptyStringMap `json:"metadata"`
Tags EmptyStringArray `json:"tags"`

Pipeline *Pipeline `json:"-"`

Expand Down Expand Up @@ -441,6 +442,34 @@ func (b *EmptyStringMap) UnmarshalJSON(data []byte) error {
return nil
}

type EmptyStringArray []string

func (a EmptyStringArray) MarshalJSON() ([]byte, error) {
if a == nil {
return json.Marshal([]string{})
}

return json.Marshal([]string(a))
}

func (a *EmptyStringArray) UnmarshalJSON(data []byte) error {
if data == nil {
return nil
}

var v []string
if err := json.Unmarshal(data, &v); err != nil {
return err
}

if len(v) == 0 {
return nil
}

*a = v
return nil
}

type AssetCollection []*Asset

func (ac AssetCollection) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -626,6 +655,19 @@ func (p *Pipeline) GetAssetByName(assetName string) *Asset {
return asset
}

func (p *Pipeline) GetAssetsByTag(tag string) []*Asset {
assets := make([]*Asset, 0)
for _, asset := range p.Assets {
for _, t := range asset.Tags {
if t == tag {
assets = append(assets, asset)
}
}
}

return assets
}

type TaskCreator func(path string) (*Asset, error)

type BuilderConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestPipeline_JsonMarshal(t *testing.T) {

// uncomment the line below and run the test once to refresh the data
// don't forget to comment it out again
// err = afero.WriteFile(afero.NewOsFs(), path, bytes.ReplaceAll(got, []byte(dir), []byte("__BASEDIR__")), 0644)
// err = afero.WriteFile(afero.NewOsFs(), path, bytes.ReplaceAll(got, []byte(dir), []byte("__BASEDIR__")), 0o644)

expected := strings.ReplaceAll(mustRead(t, path), "__BASEDIR__", dir)

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/testdata/pipeline/first-pipeline_unix.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"legacy_id":"first-pipeline","name":"first-pipeline","schedule":"","start_date":"","definition_file":{"name":"pipeline.yml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/pipeline.yml"},"default_parameters":{"param1":"value1","param2":"value2"},"default_connections":{"gcpConnectionId":"gcp-connection-id-here","slack":"slack-connection"},"assets":[{"id":"943be81e20336c53de2c8ab40991839ca3b88bcb4f854f03cdbd69825eb369b6","name":"task1","description":"This is a hello world task","type":"bash","executable_file":{"name":"hello.sh","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task1/hello.sh","content":"echo \"hello world from test script\""},"definition_file":{"name":"task.yml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task1/task.yml","type":"yaml"},"parameters":{"param1":"value1","param2":"value2"},"connection":"conn1","secrets":[],"upstream":["gcs-to-bq"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{}},{"id":"c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b","name":"second-task","description":"","type":"bq.transfer","executable_file":{"name":"","path":"","content":""},"definition_file":{"name":"task.yaml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task2/task.yaml","type":"yaml"},"parameters":{"location":"europe-west1","project_id":"a-new-project-id","transfer_config_id":"some-uuid"},"connection":"","secrets":[],"upstream":[],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{}},{"id":"21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9","name":"some-python-task","description":"some description goes here","type":"python","executable_file":{"name":"test.py","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.py","content":"# @bruin.name: some-python-task\n# @bruin.description: some description goes here\n# @bruin.depends: task1, task2\n# @bruin.depends: task3,task4\n# @bruin.depends: task5, task3\n# @bruin.parameters.param1: first-parameter\n# @bruin.parameters.param2: second-parameter\n# @bruin.parameters.param3: third-parameter\n# @bruin.connection: first-connection\n\nprint('hello world')"},"definition_file":{"name":"test.py","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.py","type":"comment"},"parameters":{"param1":"first-parameter","param2":"second-parameter","param3":"third-parameter"},"connection":"first-connection","secrets":[],"upstream":["task1","task2","task3","task4","task5","task3"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{}},{"id":"5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b","name":"some-sql-task","description":"some description goes here","type":"bq.sql","executable_file":{"name":"test.sql","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.sql","content":"-- @bruin.name: some-sql-task\n-- @bruin.description: some description goes here\n-- @bruin.type: bq.sql\n-- @bruin.depends: task1, task2\n-- @bruin.depends: task3,task4\n-- @bruin.depends: task5, task3\n-- @bruin.parameters.param1: first-parameter\n-- @bruin.parameters.param2: second-parameter\n-- @bruin.connection: conn2\n\nselect *\nfrom foo;"},"definition_file":{"name":"test.sql","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.sql","type":"comment"},"parameters":{"param1":"first-parameter","param2":"second-parameter"},"connection":"conn2","secrets":[],"upstream":["task1","task2","task3","task4","task5","task3"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{}}],"notifications":{"slack":[]},"catchup":false,"retries":3}
{"legacy_id":"first-pipeline","name":"first-pipeline","schedule":"","start_date":"","definition_file":{"name":"pipeline.yml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/pipeline.yml"},"default_parameters":{"param1":"value1","param2":"value2"},"default_connections":{"gcpConnectionId":"gcp-connection-id-here","slack":"slack-connection"},"assets":[{"id":"943be81e20336c53de2c8ab40991839ca3b88bcb4f854f03cdbd69825eb369b6","name":"task1","description":"This is a hello world task","type":"bash","executable_file":{"name":"hello.sh","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task1/hello.sh","content":"echo \"hello world from test script\""},"definition_file":{"name":"task.yml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task1/task.yml","type":"yaml"},"parameters":{"param1":"value1","param2":"value2"},"connection":"conn1","secrets":[],"upstream":["gcs-to-bq"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{},"tags":[]},{"id":"c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b","name":"second-task","description":"","type":"bq.transfer","executable_file":{"name":"","path":"","content":""},"definition_file":{"name":"task.yaml","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/task2/task.yaml","type":"yaml"},"parameters":{"location":"europe-west1","project_id":"a-new-project-id","transfer_config_id":"some-uuid"},"connection":"","secrets":[],"upstream":[],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{},"tags":[]},{"id":"21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9","name":"some-python-task","description":"some description goes here","type":"python","executable_file":{"name":"test.py","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.py","content":"# @bruin.name: some-python-task\n# @bruin.description: some description goes here\n# @bruin.depends: task1, task2\n# @bruin.depends: task3,task4\n# @bruin.depends: task5, task3\n# @bruin.parameters.param1: first-parameter\n# @bruin.parameters.param2: second-parameter\n# @bruin.parameters.param3: third-parameter\n# @bruin.connection: first-connection\n\nprint('hello world')"},"definition_file":{"name":"test.py","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.py","type":"comment"},"parameters":{"param1":"first-parameter","param2":"second-parameter","param3":"third-parameter"},"connection":"first-connection","secrets":[],"upstream":["task1","task2","task3","task4","task5","task3"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{},"tags":[]},{"id":"5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b","name":"some-sql-task","description":"some description goes here","type":"bq.sql","executable_file":{"name":"test.sql","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.sql","content":"-- @bruin.name: some-sql-task\n-- @bruin.description: some description goes here\n-- @bruin.type: bq.sql\n-- @bruin.depends: task1, task2\n-- @bruin.depends: task3,task4\n-- @bruin.depends: task5, task3\n-- @bruin.parameters.param1: first-parameter\n-- @bruin.parameters.param2: second-parameter\n-- @bruin.connection: conn2\n\nselect *\nfrom foo;"},"definition_file":{"name":"test.sql","path":"__BASEDIR__/testdata/pipeline/first-pipeline/tasks/test.sql","type":"comment"},"parameters":{"param1":"first-parameter","param2":"second-parameter"},"connection":"conn2","secrets":[],"upstream":["task1","task2","task3","task4","task5","task3"],"materialization":null,"columns":[],"custom_checks":[],"image":"","instance":"","owner":"","metadata":{},"tags":[]}],"notifications":{"slack":[]},"catchup":false,"retries":3}
Loading

0 comments on commit f23f921

Please sign in to comment.