This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
tag_manager.go
103 lines (87 loc) · 3.76 KB
/
tag_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package impl
import (
"context"
"time"
"github.com/flyteorg/datacatalog/pkg/manager/impl/validators"
"github.com/flyteorg/datacatalog/pkg/manager/interfaces"
"github.com/flyteorg/datacatalog/pkg/repositories"
"github.com/flyteorg/datacatalog/pkg/repositories/models"
"github.com/flyteorg/datacatalog/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/datacatalog/pkg/errors"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"
)
type tagMetrics struct {
scope promutils.Scope
createResponseTime labeled.StopWatch
addTagSuccessCounter labeled.Counter
addTagFailureCounter labeled.Counter
validationErrorCounter labeled.Counter
alreadyExistsCounter labeled.Counter
}
type tagManager struct {
repo repositories.RepositoryInterface
store *storage.DataStore
systemMetrics tagMetrics
}
func (m *tagManager) AddTag(ctx context.Context, request *datacatalog.AddTagRequest) (*datacatalog.AddTagResponse, error) {
timer := m.systemMetrics.createResponseTime.Start(ctx)
defer timer.Stop()
if err := validators.ValidateTag(request.Tag); err != nil {
logger.Warnf(ctx, "Invalid get tag request %+v err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}
// verify the artifact and dataset exists before adding a tag to it
datasetID := request.Tag.Dataset
ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain)
datasetKey := transformers.FromDatasetID(datasetID)
dataset, err := m.repo.DatasetRepo().Get(ctx, datasetKey)
if err != nil {
m.systemMetrics.addTagFailureCounter.Inc(ctx)
return nil, err
}
artifactKey := transformers.ToArtifactKey(datasetID, request.Tag.ArtifactId)
_, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
if err != nil {
m.systemMetrics.addTagFailureCounter.Inc(ctx)
return nil, err
}
tagKey := transformers.ToTagKey(datasetID, request.Tag.Name)
err = m.repo.TagRepo().Create(ctx, models.Tag{
TagKey: tagKey,
ArtifactID: request.Tag.ArtifactId,
DatasetUUID: dataset.UUID,
})
if err != nil {
if errors.IsAlreadyExistsError(err) {
logger.Warnf(ctx, "Tag already exists key: %+v, err %v", request, err)
m.systemMetrics.alreadyExistsCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to tag artifact: %+v err: %v", request, err)
m.systemMetrics.addTagFailureCounter.Inc(ctx)
}
return nil, err
}
m.systemMetrics.addTagSuccessCounter.Inc(ctx)
return &datacatalog.AddTagResponse{}, nil
}
func NewTagManager(repo repositories.RepositoryInterface, store *storage.DataStore, tagScope promutils.Scope) interfaces.TagManager {
systemMetrics := tagMetrics{
scope: tagScope,
createResponseTime: labeled.NewStopWatch("create_duration", "The duration of the add tag calls.", time.Millisecond, tagScope, labeled.EmitUnlabeledMetric),
addTagSuccessCounter: labeled.NewCounter("create_success_count", "The number of times an artifact was tagged successfully", tagScope, labeled.EmitUnlabeledMetric),
addTagFailureCounter: labeled.NewCounter("create_failure_count", "The number of times we failed to tag an artifact", tagScope, labeled.EmitUnlabeledMetric),
validationErrorCounter: labeled.NewCounter("validation_failed_count", "The number of times we failed validate a tag", tagScope, labeled.EmitUnlabeledMetric),
alreadyExistsCounter: labeled.NewCounter("already_exists_count", "The number of times an tag already exists", tagScope, labeled.EmitUnlabeledMetric),
}
return &tagManager{
repo: repo,
store: store,
systemMetrics: systemMetrics,
}
}