forked from flyteorg/datacatalog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dataset_manager.go
121 lines (106 loc) · 5.08 KB
/
dataset_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package impl
import (
"context"
"github.com/lyft/datacatalog/pkg/manager/impl/validators"
"github.com/lyft/datacatalog/pkg/manager/interfaces"
"github.com/lyft/datacatalog/pkg/repositories"
"github.com/lyft/datacatalog/pkg/repositories/transformers"
datacatalog "github.com/lyft/datacatalog/protos/gen"
"github.com/lyft/datacatalog/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
)
type datasetMetrics struct {
scope promutils.Scope
createSuccessCounter labeled.Counter
createErrorCounter labeled.Counter
getSuccessCounter labeled.Counter
getErrorCounter labeled.Counter
transformerErrorCounter labeled.Counter
validationErrorCounter labeled.Counter
alreadyExistsCounter labeled.Counter
doesNotExistCounter labeled.Counter
}
type datasetManager struct {
repo repositories.RepositoryInterface
store *storage.DataStore
systemMetrics datasetMetrics
}
// Create a Dataset with optional metadata. If one already exists a grpc AlreadyExists err will be returned
func (dm *datasetManager) CreateDataset(ctx context.Context, request datacatalog.CreateDatasetRequest) (*datacatalog.CreateDatasetResponse, error) {
err := validators.ValidateDatasetID(request.Dataset.Id)
if err != nil {
logger.Warnf(ctx, "Invalid create dataset request %+v err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}
datasetModel, err := transformers.CreateDatasetModel(request.Dataset)
if err != nil {
logger.Errorf(ctx, "Unable to transform create dataset request %+v err: %v", request, err)
dm.systemMetrics.transformerErrorCounter.Inc(ctx)
return nil, err
}
err = dm.repo.DatasetRepo().Create(ctx, *datasetModel)
if err != nil {
if errors.IsAlreadyExistsError(err) {
logger.Warnf(ctx, "Dataset already exists key: %+v, err %v", request.Dataset, err)
dm.systemMetrics.alreadyExistsCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to create dataset model: %+v err: %v", datasetModel, err)
dm.systemMetrics.createErrorCounter.Inc(ctx)
}
return nil, err
}
logger.Debugf(ctx, "Successfully created dataset %+v", request.Dataset)
dm.systemMetrics.createSuccessCounter.Inc(ctx)
return &datacatalog.CreateDatasetResponse{}, nil
}
// Get a Dataset with the given DatasetID if it exists. If none exist a grpc NotFound err will be returned
func (dm *datasetManager) GetDataset(ctx context.Context, request datacatalog.GetDatasetRequest) (*datacatalog.GetDatasetResponse, error) {
err := validators.ValidateDatasetID(request.Dataset)
if err != nil {
logger.Warnf(ctx, "Invalid get dataset request %+v err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}
datasetKey := transformers.FromDatasetID(*request.Dataset)
datasetModel, err := dm.repo.DatasetRepo().Get(ctx, datasetKey)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Dataset does not exist key: %+v, err %v", datasetKey, err)
dm.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Unable to get dataset request %+v err: %v", request, err)
dm.systemMetrics.getErrorCounter.Inc(ctx)
}
return nil, err
}
datasetResponse, err := transformers.FromDatasetModel(datasetModel)
if err != nil {
dm.systemMetrics.transformerErrorCounter.Inc(ctx)
return nil, err
}
dm.systemMetrics.getSuccessCounter.Inc(ctx)
return &datacatalog.GetDatasetResponse{
Dataset: datasetResponse,
}, nil
}
func NewDatasetManager(repo repositories.RepositoryInterface, store *storage.DataStore, datasetScope promutils.Scope) interfaces.DatasetManager {
return &datasetManager{
repo: repo,
store: store,
systemMetrics: datasetMetrics{
scope: datasetScope,
createSuccessCounter: labeled.NewCounter("create_success_count", "The number of times create dataset was called", datasetScope, labeled.EmitUnlabeledMetric),
getSuccessCounter: labeled.NewCounter("get_success_count", "The number of times get dataset was called", datasetScope, labeled.EmitUnlabeledMetric),
createErrorCounter: labeled.NewCounter("create_failed_count", "The number of times create dataset failed", datasetScope, labeled.EmitUnlabeledMetric),
getErrorCounter: labeled.NewCounter("get_failed_count", "The number of times get dataset failed", datasetScope, labeled.EmitUnlabeledMetric),
transformerErrorCounter: labeled.NewCounter("transformer_failed_count", "The number of times transformations failed", datasetScope, labeled.EmitUnlabeledMetric),
validationErrorCounter: labeled.NewCounter("validation_failed_count", "The number of times validation failed", datasetScope, labeled.EmitUnlabeledMetric),
alreadyExistsCounter: labeled.NewCounter("already_exists_count", "The number of times a dataset already exists", datasetScope, labeled.EmitUnlabeledMetric),
doesNotExistCounter: labeled.NewCounter("does_not_exists_count", "The number of times a dataset was not found", datasetScope, labeled.EmitUnlabeledMetric),
},
}
}