Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the ability to validate a single asset #15

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions cmd/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,17 @@ func Lint(isDebug *bool) *cli.Command {

logger := makeLogger(*isDebug)

rootPath := c.Args().Get(0)
if rootPath == "" {
rootPath = "."
repoOrAsset := c.Args().Get(0)
rootPath := repoOrAsset
asset := ""
if isPathReferencingAsset(repoOrAsset) {
asset = repoOrAsset
pipelineRootFromAsset, err := path.GetPipelineRootFromTask(repoOrAsset, pipelineDefinitionFile)
if err != nil {
errorPrinter.Printf("Failed to find the pipeline root for the given asset: %v\n", err)
return cli.Exit("", 1)
}
rootPath = pipelineRootFromAsset
}

logger.Debugf("using root path '%s'", rootPath)
Expand Down Expand Up @@ -119,13 +127,21 @@ func Lint(isDebug *bool) *cli.Command {
logger.Debug("no Snowflake connections found, skipping Snowflake validation")
}

linter := lint.NewLinter(path.GetPipelinePaths, builder, rules, logger)

infoPrinter.Printf("Validating pipelines in '%s' for '%s' environment...\n", rootPath, cm.SelectedEnvironmentName)
result, err := linter.Lint(rootPath, pipelineDefinitionFile)
var result *lint.PipelineAnalysisResult
if asset == "" {
linter := lint.NewLinter(path.GetPipelinePaths, builder, rules, logger)
logger.Debugf("running %d rules for pipeline validation", len(rules))
infoPrinter.Printf("Validating pipelines in '%s' for '%s' environment...\n", rootPath, cm.SelectedEnvironmentName)
result, err = linter.Lint(rootPath, pipelineDefinitionFile)
} else {
rules = lint.FilterRulesByLevel(rules, lint.LevelAsset)
logger.Debugf("running %d rules for asset-only validation", len(rules))
linter := lint.NewLinter(path.GetPipelinePaths, builder, rules, logger)
result, err = linter.LintAsset(rootPath, pipelineDefinitionFile, asset)
}

printer := lint.Printer{RootCheckPath: rootPath}
err = reportLintErrors(result, err, printer)
err = reportLintErrors(result, err, printer, asset)
if err != nil {
return cli.Exit("", 1)
}
Expand All @@ -134,9 +150,9 @@ func Lint(isDebug *bool) *cli.Command {
}
}

func reportLintErrors(result *lint.PipelineAnalysisResult, err error, printer lint.Printer) error {
func reportLintErrors(result *lint.PipelineAnalysisResult, err error, printer lint.Printer, asset string) error {
if err != nil {
errorPrinter.Println("\nAn error occurred while linting the pipelines:")
errorPrinter.Println("\nAn error occurred while linting:")

errorList := unwrapAllErrors(err)
for i, e := range errorList {
Expand All @@ -162,7 +178,11 @@ func reportLintErrors(result *lint.PipelineAnalysisResult, err error, printer li
issueStr += "s"
}

errorPrinter.Printf("\n✘ Checked %d %s and found %d %s, please check above.\n", pipelineCount, pipelineStr, errorCount, issueStr)
if asset == "" {
errorPrinter.Printf("\n✘ Checked %d %s and found %d %s, please check above.\n", pipelineCount, pipelineStr, errorCount, issueStr)
} else {
errorPrinter.Printf("\n✘ Checked %s and found %d %s, please check above.\n", asset, errorCount, issueStr)
}
return errors.New("validation failed")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func Run(isDebug *bool) *cli.Command {

linter := lint.NewLinter(path.GetPipelinePaths, builder, rules, logger)
res, err := linter.LintPipelines([]*pipeline.Pipeline{foundPipeline})
err = reportLintErrors(res, err, lint.Printer{RootCheckPath: pipelinePath})
err = reportLintErrors(res, err, lint.Printer{RootCheckPath: pipelinePath}, "")
if err != nil {
return cli.Exit("", 1)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
pipeline.AssetTypeEmpty: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
"athena.sql": {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
"athena.sensor.query": {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
pipeline.AssetTypePostgresQuery: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
Expand Down
8 changes: 8 additions & 0 deletions pkg/lint/level.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package lint

type Level int

const (
LevelPipeline Level = iota
LevelAsset
)
97 changes: 93 additions & 4 deletions pkg/lint/lint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lint

import (
"context"
"fmt"
"os"
"sort"
Expand All @@ -14,6 +15,7 @@ import (
type (
pipelineFinder func(root, pipelineDefinitionFile string) ([]string, error)
PipelineValidator func(pipeline *pipeline.Pipeline) ([]*Issue, error)
AssetValidator func(ctx context.Context, pipeline *pipeline.Pipeline, asset *pipeline.Asset) ([]*Issue, error)
)

type pipelineBuilder interface {
Expand All @@ -29,21 +31,37 @@ type Issue struct {
type Rule interface {
Name() string
Validate(pipeline *pipeline.Pipeline) ([]*Issue, error)
ValidateAsset(ctx context.Context, pipeline *pipeline.Pipeline, asset *pipeline.Asset) ([]*Issue, error)
GetApplicableLevels() []Level
}

type SimpleRule struct {
Identifier string
Validator PipelineValidator
Identifier string
Validator PipelineValidator
AssetValidator AssetValidator
ApplicableLevels []Level
}

func (g *SimpleRule) Validate(pipeline *pipeline.Pipeline) ([]*Issue, error) {
return g.Validator(pipeline)
}

func (g *SimpleRule) ValidateAsset(ctx context.Context, pipeline *pipeline.Pipeline, asset *pipeline.Asset) ([]*Issue, error) {
if g.AssetValidator == nil {
return []*Issue{}, errors.New(fmt.Sprintf("the rule '%s' cannot be used to validate assets, please open an issue", g.Identifier))
}

return g.AssetValidator(ctx, pipeline, asset)
}

func (g *SimpleRule) Name() string {
return g.Identifier
}

func (g *SimpleRule) GetApplicableLevels() []Level {
return g.ApplicableLevels
}

type Linter struct {
findPipelines pipelineFinder
builder pipelineBuilder
Expand All @@ -61,6 +79,78 @@ func NewLinter(findPipelines pipelineFinder, builder pipelineBuilder, rules []Ru
}

func (l *Linter) Lint(rootPath, pipelineDefinitionFileName string) (*PipelineAnalysisResult, error) {
pipelines, err := l.extractPipelinesFromPath(rootPath, pipelineDefinitionFileName)
if err != nil {
return nil, err
}

return l.LintPipelines(pipelines)
}

func (l *Linter) LintAsset(rootPath, pipelineDefinitionFileName, assetNameOrPath string) (*PipelineAnalysisResult, error) {
pipelines, err := l.extractPipelinesFromPath(rootPath, pipelineDefinitionFileName)
if err != nil {
return nil, err
}

var assetPipeline *pipeline.Pipeline
var asset *pipeline.Asset
for _, fp := range pipelines {
asset = fp.GetAssetByPath(assetNameOrPath)
if asset != nil {
assetPipeline = fp
l.logger.Debugf("found an asset with path under the pipeline '%s'", fp.DefinitionFile.Path)
break
}
}

if asset == nil {
l.logger.Debugf("couldn't find an asset with the path '%s', trying it as a name instead", assetNameOrPath)

matchedAssetCount := 0
for _, fp := range pipelines {
maybeAsset := fp.GetAssetByName(assetNameOrPath)
if maybeAsset != nil {
matchedAssetCount++
asset = maybeAsset
assetPipeline = fp
}
}

if matchedAssetCount > 1 {
return nil, errors.Errorf("there are %d assets with the name '%s', you'll have to use an asset path or go to the pipeline directory", matchedAssetCount, assetNameOrPath)
}
}

if asset == nil {
return nil, errors.Errorf("failed to find an asset with the path or name '%s' under the path '%s'", assetNameOrPath, rootPath)
}

pipelineResult := &PipelineIssues{
Pipeline: assetPipeline,
Issues: make(map[Rule][]*Issue),
}

// now the actual validation starts
for _, rule := range l.rules {
issues, err := rule.ValidateAsset(context.TODO(), assetPipeline, asset)
if err != nil {
return nil, err
}

if len(issues) > 0 {
pipelineResult.Issues[rule] = issues
}
}

return &PipelineAnalysisResult{
Pipelines: []*PipelineIssues{
pipelineResult,
},
}, nil
}

func (l *Linter) extractPipelinesFromPath(rootPath string, pipelineDefinitionFileName string) ([]*pipeline.Pipeline, error) {
pipelinePaths, err := l.findPipelines(rootPath, pipelineDefinitionFileName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -95,8 +185,7 @@ func (l *Linter) Lint(rootPath, pipelineDefinitionFileName string) (*PipelineAna
}

l.logger.Debugf("constructed %d pipelines", len(pipelines))

return l.LintPipelines(pipelines)
return pipelines, nil
}

type PipelineAnalysisResult struct {
Expand Down
Loading
Loading