Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ $(eval $(call makemock, internal/orchestrator, Orchestrator, orchestra
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/apiserver, IServer, apiservermocks))
$(eval $(call makemock, internal/metrics, Manager, metricsmocks))
$(eval $(call makemock, internal/operations, Manager, operationmocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=`date -u +\"%Y-%m-%dT%H:%M:%SZ\"` -X main.buildVersion=$(BUILD_VERSION)" -tags=prod -tags=prod -v
Expand All @@ -91,4 +92,4 @@ swagger:
manifest:
./manifestgen.sh
docker:
./docker_build.sh $(DOCKER_ARGS)
./docker_build.sh $(DOCKER_ARGS)
3 changes: 3 additions & 0 deletions db/migrations/postgres/000069_add_operation_retry.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE operations DROP COLUMN retry_id;
COMMIT;
3 changes: 3 additions & 0 deletions db/migrations/postgres/000069_add_operation_retry.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE operations ADD COLUMN retry_id UUID;
COMMIT;
1 change: 1 addition & 0 deletions db/migrations/sqlite/000069_add_operation_retry.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE operations DROP COLUMN retry_id;
1 change: 1 addition & 0 deletions db/migrations/sqlite/000069_add_operation_retry.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE operations ADD COLUMN retry_id UUID;
80 changes: 80 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5092,6 +5092,7 @@ paths:
type: object
plugin:
type: string
retry: {}
status:
type: string
tx: {}
Expand Down Expand Up @@ -5781,6 +5782,11 @@ paths:
name: plugin
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: retry
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: status
Expand Down Expand Up @@ -5855,6 +5861,7 @@ paths:
type: object
plugin:
type: string
retry: {}
status:
type: string
tx: {}
Expand Down Expand Up @@ -5920,6 +5927,78 @@ paths:
type: object
plugin:
type: string
retry: {}
status:
type: string
tx: {}
type:
enum:
- blockchain_batch_pin
- blockchain_invoke
- sharedstorage_batch_broadcast
- dataexchange_batch_send
- dataexchange_blob_send
- token_create_pool
- token_activate_pool
- token_transfer
- token_approval
type: string
updated: {}
type: object
description: Success
default:
description: ""
/namespaces/{ns}/operations/{opid}/retry:
post:
description: 'TODO: Description'
operationId: postOpRetry
parameters:
- description: 'TODO: Description'
in: path
name: ns
required: true
schema:
example: default
type: string
- description: 'TODO: Description'
in: path
name: opid
required: true
schema:
type: string
- description: Server-side request timeout (millseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 120s
type: string
requestBody:
content:
application/json:
schema:
type: object
responses:
"202":
content:
application/json:
schema:
properties:
created: {}
error:
type: string
id: {}
input:
additionalProperties: {}
type: object
namespace:
type: string
output:
additionalProperties: {}
type: object
plugin:
type: string
retry: {}
status:
type: string
tx: {}
Expand Down Expand Up @@ -8795,6 +8874,7 @@ paths:
type: object
plugin:
type: string
retry: {}
status:
type: string
tx: {}
Expand Down
52 changes: 52 additions & 0 deletions internal/apiserver/route_post_op_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"context"
"net/http"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/oapispec"
"github.com/hyperledger/firefly/pkg/fftypes"
)

var postOpRetry = &oapispec.Route{
Name: "postOpRetry",
Path: "namespaces/{ns}/operations/{opid}/retry",
Method: http.MethodPost,
PathParams: []*oapispec.PathParam{
{Name: "ns", ExampleFromConf: config.NamespacesDefault, Description: i18n.MsgTBD},
{Name: "opid", Description: i18n.MsgTBD},
},
QueryParams: []*oapispec.QueryParam{},
FilterFactory: nil,
Description: i18n.MsgTBD,
JSONInputValue: func() interface{} { return &fftypes.EmptyInput{} },
JSONInputMask: nil,
JSONInputSchema: func(ctx context.Context) string { return emptyObjectSchema },
JSONOutputValue: func() interface{} { return &fftypes.Operation{} },
JSONOutputCodes: []int{http.StatusAccepted},
JSONHandler: func(r *oapispec.APIRequest) (output interface{}, err error) {
opid, err := fftypes.ParseUUID(r.Ctx, r.PP["opid"])
if err != nil {
return nil, err
}
return getOr(r.Ctx).Operations().RetryOperation(r.Ctx, r.PP["ns"], opid)
},
}
62 changes: 62 additions & 0 deletions internal/apiserver/route_post_op_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright © 2021 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"bytes"
"encoding/json"
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly/mocks/operationmocks"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestPostOpRetry(t *testing.T) {
o, r := newTestAPIServer()
mom := &operationmocks.Manager{}
o.On("Operations").Return(mom)
input := fftypes.EmptyInput{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
opID := fftypes.NewUUID()
req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/operations/"+opID.String()+"/retry", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mom.On("RetryOperation", mock.Anything, "ns1", opID).
Return(&fftypes.Operation{}, nil)
r.ServeHTTP(res, req)

assert.Equal(t, 202, res.Result().StatusCode)
}

func TestPostOpRetryBadID(t *testing.T) {
_, r := newTestAPIServer()
input := fftypes.EmptyInput{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/namespaces/ns1/operations/bad/retry", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

r.ServeHTTP(res, req)

assert.Equal(t, 400, res.Result().StatusCode)
}
1 change: 1 addition & 0 deletions internal/apiserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var routes = []*oapispec.Route{
getNamespaces,
getOpByID,
getOps,
postOpRetry,
getStatus,
getStatusBatchManager,
getSubscriptionByID,
Expand Down
59 changes: 30 additions & 29 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/identity"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/privatemessaging"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
Expand All @@ -36,8 +36,10 @@ import (
)

type Manager interface {
fftypes.Named

CreateTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error)
ActivateTokenPool(ctx context.Context, pool *fftypes.TokenPool, event *fftypes.BlockchainEvent) error
ActivateTokenPool(ctx context.Context, pool *fftypes.TokenPool, blockchainInfo fftypes.JSONObject) error
GetTokenPools(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenPool, *database.FilterResult, error)
GetTokenPool(ctx context.Context, ns, connector, poolName string) (*fftypes.TokenPool, error)
GetTokenPoolByNameOrID(ctx context.Context, ns string, poolNameOrID string) (*fftypes.TokenPool, error)
Expand All @@ -60,8 +62,9 @@ type Manager interface {
TokenApproval(ctx context.Context, ns string, approval *fftypes.TokenApprovalInput, waitConfirm bool) (*fftypes.TokenApproval, error)
GetTokenApprovals(ctx context.Context, ns string, filter database.AndFilter) ([]*fftypes.TokenApproval, *database.FilterResult, error)

Start() error
WaitStop()
// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *fftypes.Operation) (*fftypes.PreparedOperation, error)
RunOperation(ctx context.Context, op *fftypes.PreparedOperation) (complete bool, err error)
}

type assetManager struct {
Expand All @@ -74,36 +77,42 @@ type assetManager struct {
broadcast broadcast.Manager
messaging privatemessaging.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
metrics metrics.Manager
operations operations.Manager
keyNormalization int
}

func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager) (Manager, error) {
if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil {
func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin, mm metrics.Manager, om operations.Manager) (Manager, error) {
if di == nil || im == nil || sa == nil || bm == nil || pm == nil || ti == nil || mm == nil || om == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
am := &assetManager{
ctx: ctx,
database: di,
txHelper: txcommon.NewTransactionHelper(di),
identity: im,
data: dm,
syncasync: sa,
broadcast: bm,
messaging: pm,
tokens: ti,
retry: retry.Retry{
InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay),
MaximumDelay: config.GetDuration(config.AssetManagerRetryMaxDelay),
Factor: config.GetFloat64(config.AssetManagerRetryFactor),
},
ctx: ctx,
database: di,
txHelper: txcommon.NewTransactionHelper(di),
identity: im,
data: dm,
syncasync: sa,
broadcast: bm,
messaging: pm,
tokens: ti,
keyNormalization: identity.ParseKeyNormalizationConfig(config.GetString(config.AssetManagerKeyNormalization)),
metrics: mm,
operations: om,
}
om.RegisterHandler(ctx, am, []fftypes.OpType{
fftypes.OpTypeTokenCreatePool,
fftypes.OpTypeTokenActivatePool,
fftypes.OpTypeTokenTransfer,
fftypes.OpTypeTokenApproval,
})
return am, nil
}

func (am *assetManager) Name() string {
return "AssetManager"
}

func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tokens.Plugin, error) {
for pluginName, plugin := range am.tokens {
if pluginName == name {
Expand Down Expand Up @@ -147,14 +156,6 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context, ns string) ([]*f
return connectors, nil
}

func (am *assetManager) Start() error {
return nil
}

func (am *assetManager) WaitStop() {
// No go routines
}

func (am *assetManager) getTokenConnectorName(ctx context.Context, ns string) (string, error) {
tokenConnectors, err := am.GetTokenConnectors(ctx, ns)
if err != nil {
Expand Down
Loading