Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge 4bbe0f8 into 78b3e14
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicheng-Lu-llll committed Jun 30, 2023
2 parents 78b3e14 + 4bbe0f8 commit 02d399f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 0 deletions.
12 changes: 12 additions & 0 deletions dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req)
}

// Check if the native url exists
metadata, err := s.dataStore.Head(ctx, storage.DataReference(nativeURL))
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to check the existence of the URL [%s]. Error: %v", nativeURL, err)

Check warning on line 144 in dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

dataproxy/service.go#L144

Added line #L144 was not covered by tests
}
if !metadata.Exists() {
return nil, errors.NewFlyteAdminErrorf(
codes.NotFound,
"URL [%s] does not exist yet. Please try again later. If you are using the real-time deck, this could be because the 'persist' function has not been called yet.",
nativeURL)
}

signedURLResp, err := s.dataStore.CreateSignedURL(ctx, storage.DataReference(nativeURL), storage.SignedURLProperties{
Scope: stow.ClientMethodGet,
ExpiresIn: req.ExpiresIn.AsDuration(),
Expand Down
36 changes: 36 additions & 0 deletions dataproxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func TestCreateUploadLocation(t *testing.T) {

func TestCreateDownloadLink(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) {
return existsMetadata{}, nil
}
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
nodeExecutionManager.SetGetNodeExecutionFunc(func(ctx context.Context, request admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) {
return &admin.NodeExecution{
Expand Down Expand Up @@ -127,6 +130,19 @@ func TestCreateDownloadLink(t *testing.T) {
})
assert.NoError(t, err)
})

t.Run("nonexistent URI", func(t *testing.T) {
dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) {
return nonexistentMetadata{}, nil
}
_, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{
ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK,
Source: &service.CreateDownloadLinkRequest_NodeExecutionId{
NodeExecutionId: &core.NodeExecutionIdentifier{},
},
})
assert.Error(t, err)
})
}

func TestCreateDownloadLocation(t *testing.T) {
Expand Down Expand Up @@ -350,3 +366,23 @@ func TestService_Error(t *testing.T) {
assert.Error(t, err, "no task executions")
})
}

type existsMetadata struct{}

func (e existsMetadata) Exists() bool {
return true
}

func (e existsMetadata) Size() int64 {
return int64(1)
}

type nonexistentMetadata struct{}

func (e nonexistentMetadata) Exists() bool {
return false
}

func (e nonexistentMetadata) Size() int64 {
return int64(0)
}
1 change: 1 addition & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.Event.DeckUri
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"

var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -69,6 +70,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -77,6 +79,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.StartedAt))
assert.Equal(t, DeckURI, closure.DeckUri)
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -88,6 +91,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -103,6 +107,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.DeckUri)
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down

0 comments on commit 02d399f

Please sign in to comment.