Skip to content

Commit

Permalink
feat: accept optional field ID in create asset probe (#35)
Browse files Browse the repository at this point in the history
- Accept an optional ID field for create asset probe. ID, if specified,
  needs to be a valid UUID. The probe is created with the given ID if
  specified in the request and auto-generated otherwise. If the ID
  already exists, an appropriate error is returned.
- Bump proton commit for generating protos.
  • Loading branch information
sudo-suhas authored May 30, 2023
1 parent 97f95b0 commit 751b3a0
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 16 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "374259a5277ce724ecb0cda5091123db5d55c118"
PROTON_COMMIT := "a6b2821e8ddd1127a63d3b376f860990d58931da"
.PHONY: all build test clean install proto

all: build
Expand Down Expand Up @@ -32,7 +32,6 @@ lint: ## Lint checker

proto: ## Generate the protobuf files
@echo " > generating protobuf from goto/proton"
@echo " > [info] make sure correct version of dependencies are installed using 'make install'"
@buf generate https://github.com/goto/proton/archive/${PROTON_COMMIT}.zip#strip_components=1 --template buf.gen.yaml --path gotocompany/compass -v
@echo " > protobuf compilation finished"

Expand Down
1 change: 1 addition & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

var (
ErrEmptyID = errors.New("asset does not have ID")
ErrProbeExists = errors.New("asset probe already exists")
ErrEmptyURN = errors.New("asset does not have URN")
ErrUnknownType = errors.New("unknown type")
ErrNilAsset = errors.New("nil asset")
Expand Down
17 changes: 16 additions & 1 deletion internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/goto/compass/core/asset"
"github.com/goto/compass/core/star"
"github.com/goto/compass/core/user"
Expand Down Expand Up @@ -318,6 +319,9 @@ func (server *APIServer) CreateAssetProbe(ctx context.Context, req *compassv1bet
return nil, err
}

if req.Probe.Id != "" && !isValidUUID(req.Probe.Id) {
return nil, status.Error(codes.InvalidArgument, "id should be a valid UUID")
}
if req.Probe.Status == "" {
return nil, status.Error(codes.InvalidArgument, "Status is required")
}
Expand All @@ -326,15 +330,21 @@ func (server *APIServer) CreateAssetProbe(ctx context.Context, req *compassv1bet
}

probe := asset.Probe{
ID: req.Probe.Id,
Status: req.Probe.Status,
StatusReason: req.Probe.StatusReason,
Metadata: req.Probe.Metadata.AsMap(),
Timestamp: req.Probe.Timestamp.AsTime(),
}
if err := server.assetService.AddProbe(ctx, req.AssetUrn, &probe); err != nil {
if errors.As(err, &asset.NotFoundError{}) {
switch {
case errors.As(err, &asset.NotFoundError{}):
return nil, status.Error(codes.NotFound, err.Error())

case errors.Is(err, asset.ErrProbeExists):
return nil, status.Error(codes.AlreadyExists, err.Error())
}

return nil, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -762,3 +772,8 @@ func diffChangeFromProto(pb *compassv1beta1.Change) diff.Change {
To: toItf,
}
}

func isValidUUID(u string) bool {
_, err := uuid.Parse(u)
return err == nil
}
32 changes: 32 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,18 @@ func TestCreateAssetProbe(t *testing.T) {
}

var testCases = []testCase{
{
Description: `should return error if id is not a valid UUID`,
ExpectStatus: codes.InvalidArgument,
Request: &compassv1beta1.CreateAssetProbeRequest{
AssetUrn: assetURN,
Probe: &compassv1beta1.CreateAssetProbeRequest_Probe{
Id: "invaliduuid",
Status: "RUNNING",
Timestamp: timestamppb.New(now),
},
},
},
{
Description: `should return error if status is missing`,
ExpectStatus: codes.InvalidArgument,
Expand Down Expand Up @@ -1269,6 +1281,26 @@ func TestCreateAssetProbe(t *testing.T) {
Return(asset.NotFoundError{URN: assetURN})
},
},
{
Description: `should return already exists if probe already exists`,
ExpectStatus: codes.AlreadyExists,
Request: &compassv1beta1.CreateAssetProbeRequest{
AssetUrn: assetURN,
Probe: &compassv1beta1.CreateAssetProbeRequest_Probe{
Id: probeID,
Status: "RUNNING",
Timestamp: timestamppb.New(now),
},
},
Setup: func(ctx context.Context, as *mocks.AssetService) {
as.EXPECT().AddProbe(ctx, assetURN, &asset.Probe{
ID: probeID,
Status: "RUNNING",
Metadata: map[string]interface{}{},
Timestamp: now,
}).Return(asset.ErrProbeExists)
},
},
{
Description: `should return internal server error if adding probe fails`,
ExpectStatus: codes.Internal,
Expand Down
31 changes: 21 additions & 10 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,21 +346,32 @@ func (r *AssetRepository) AddProbe(ctx context.Context, assetURN string, probe *
probe.Timestamp = probe.Timestamp.UTC()
}

query, args, err := sq.Insert("asset_probes").
Columns("asset_urn", "status", "status_reason", "metadata", "timestamp", "created_at").
Values(assetURN, probe.Status, probe.StatusReason, probe.Metadata, probe.Timestamp, probe.CreatedAt).
Suffix("RETURNING \"id\"").
insert := sq.Insert("asset_probes")
if probe.ID != "" {
insert = insert.Columns("id", "asset_urn", "status", "status_reason", "metadata", "timestamp", "created_at").
Values(probe.ID, assetURN, probe.Status, probe.StatusReason, probe.Metadata, probe.Timestamp, probe.CreatedAt)
} else {
insert = insert.Columns("asset_urn", "status", "status_reason", "metadata", "timestamp", "created_at").
Values(assetURN, probe.Status, probe.StatusReason, probe.Metadata, probe.Timestamp, probe.CreatedAt)
}

query, args, err := insert.Suffix("RETURNING \"id\"").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return fmt.Errorf("error building insert asset probe query: %w", err)
return fmt.Errorf("build insert asset probe query: %w", err)
}

err = r.client.db.QueryRowContext(ctx, query, args...).Scan(&probe.ID)
if errors.Is(checkPostgresError(err), errForeignKeyViolation) {
return asset.NotFoundError{URN: assetURN}
} else if err != nil {
return fmt.Errorf("error running insert asset probe query: %w", err)
if err = r.client.db.QueryRowContext(ctx, query, args...).Scan(&probe.ID); err != nil {
switch e := checkPostgresError(err); {
case errors.Is(e, errForeignKeyViolation):
return asset.NotFoundError{URN: assetURN}

case errors.Is(e, errDuplicateKey):
return asset.ErrProbeExists
}

return fmt.Errorf("run insert asset probe query: %w", err)
}

return nil
Expand Down
56 changes: 56 additions & 0 deletions internal/store/postgres/asset_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,36 @@ func (r *AssetRepositoryTestSuite) TestAddProbe() {
r.ErrorAs(err, &asset.NotFoundError{URN: urn})
})

r.Run("should return error if probe already exists", func() {
ast := asset.Asset{
URN: "urn-add-probe-1",
Type: asset.TypeJob,
Service: "airflow",
UpdatedBy: user.User{ID: defaultAssetUpdaterUserID},
}
probeID := uuid.NewString()
probe := asset.Probe{
ID: probeID,
Status: "COMPLETED",
StatusReason: "Sample Reason",
Timestamp: time.Now().Add(2 * time.Minute),
Metadata: map[string]interface{}{
"foo": "bar",
},
}

_, err := r.repository.Upsert(r.ctx, &ast)
r.Require().NoError(err)

err = r.repository.AddProbe(r.ctx, ast.URN, &probe)
r.NoError(err)

err = r.repository.AddProbe(r.ctx, ast.URN, &probe)
r.ErrorIs(err, asset.ErrProbeExists)
})

r.Run("should populate CreatedAt and persist probe", func() {
r.BeforeTest("", "")
ast := asset.Asset{
URN: "urn-add-probe-1",
Type: asset.TypeJob,
Expand Down Expand Up @@ -1337,6 +1366,33 @@ func (r *AssetRepositoryTestSuite) TestAddProbe() {
r.Require().NoError(err)
})

r.Run("should insert ID if specified", func() {
ast := asset.Asset{
URN: "urn-add-probe-1",
Type: asset.TypeJob,
Service: "airflow",
UpdatedBy: user.User{ID: defaultAssetUpdaterUserID},
}
probeID := uuid.NewString()
probe := asset.Probe{
ID: probeID,
Status: "COMPLETED",
StatusReason: "Sample Reason",
Timestamp: time.Now().Add(2 * time.Minute),
Metadata: map[string]interface{}{
"foo": "bar",
},
}

_, err := r.repository.Upsert(r.ctx, &ast)
r.Require().NoError(err)

err = r.repository.AddProbe(r.ctx, ast.URN, &probe)
r.NoError(err)

r.Equal(probeID, probe.ID)
})

r.Run("should populate Timestamp if empty", func() {
ast := asset.Asset{
URN: "urn-add-probe-2",
Expand Down
2 changes: 2 additions & 0 deletions proto/compass.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1877,6 +1877,8 @@ definitions:
CreateAssetProbeRequest.Probe:
type: object
properties:
id:
type: string
metadata:
type: object
status:
Expand Down
15 changes: 12 additions & 3 deletions proto/gotocompany/compass/v1beta1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/gotocompany/compass/v1beta1/service.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 751b3a0

Please sign in to comment.