Skip to content

Commit

Permalink
Add Name to Events, CustomName to Subscription (#71)
Browse files Browse the repository at this point in the history
* Add name to Events

* Allow sol 0.6.8, go1.14 tweaks for json number parsing

* allows for solc0.6.8
* go1.14 has changes to json `Number` treatment. This resulted in some
of our negative tests returning a different error message due to
alphabet input for "bad number"

* Change logEntry tx index type to hex uint

For correctness and also because go1.14 is messy
when parsing incorrect numbers. For us, when parsing our test events
from our file that has `0x0` into a "Number", the resulting slice does
not have the right data.

ref: golang/go#14702
https://github.com/ethereum/go-ethereum/blob/4bcc0a37ab70cb79b16893556cffdaad6974e7d8/core/types/log.go#L47

* Add CustomName to Subscriptions

* Fix json for event name

* Add tests with event stream name in smartcontract gw

* Sub auto-generated summary, accept name from end user

* Sets name to whatever is provided by end user, if present in the body
* Sets name to `summary`, if one is not provided by end user. summary is
auto generated and not returned in the API response
* Removed a comment from test file based on review comment
  • Loading branch information
vdamle committed Jun 17, 2020
1 parent 69de215 commit 1b66f30
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 39 deletions.
3 changes: 3 additions & 0 deletions internal/kldbind/types.go
Expand Up @@ -35,6 +35,9 @@ type HexBigInt = hexutil.Big
// HexUint64 models and serializes uint64
type HexUint64 = hexutil.Uint64

// HexUint models and serializes uint
type HexUint = hexutil.Uint

// ABIEvent is an event on the ABI
type ABIEvent = abi.Event

Expand Down
5 changes: 4 additions & 1 deletion internal/kldcontracts/rest2eth.go
Expand Up @@ -432,7 +432,10 @@ func (r *rest2eth) subscribeEvent(res http.ResponseWriter, req *http.Request, ad
address := common.HexToAddress(addrStr)
addr = &address
}
sub, err := r.subMgr.AddSubscription(req.Context(), addr, abiEvent, streamID, fromBlock)
// if the end user provided a name for the subscription, use it
// If not provided, it will be set to a system-generated summary
name := r.fromBodyOrForm(req, body, "name")
sub, err := r.subMgr.AddSubscription(req.Context(), addr, abiEvent, streamID, fromBlock, name)
if err != nil {
r.restErrReply(res, req, err, 400)
return
Expand Down
5 changes: 3 additions & 2 deletions internal/kldcontracts/rest2eth_test.go
Expand Up @@ -154,7 +154,7 @@ func (m *mockSubMgr) ResumeStream(ctx context.Context, id string) error {
return m.err
}
func (m *mockSubMgr) DeleteStream(ctx context.Context, id string) error { return m.err }
func (m *mockSubMgr) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*kldevents.SubscriptionInfo, error) {
func (m *mockSubMgr) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*kldevents.SubscriptionInfo, error) {
m.capturedAddr = addr
return m.sub, m.err
}
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func TestSubscribeNoAddressSuccess(t *testing.T) {
dispatcher := &mockREST2EthDispatcher{}
r, _, router := newTestREST2Eth(dispatcher)
sm := &mockSubMgr{
sub: &kldevents.SubscriptionInfo{ID: "sub1"},
sub: &kldevents.SubscriptionInfo{ID: "sub1", Name: "stream-without-address"},
}
r.subMgr = sm
bodyBytes, _ := json.Marshal(&map[string]string{
Expand All @@ -1262,6 +1262,7 @@ func TestSubscribeNoAddressSuccess(t *testing.T) {
err := json.NewDecoder(res.Result().Body).Decode(&reply)
assert.NoError(err)
assert.Equal("sub1", reply.ID)
assert.Equal("stream-without-address", reply.Name)
assert.Nil(sm.capturedAddr)
}

Expand Down
13 changes: 8 additions & 5 deletions internal/kldcontracts/smartcontractgw_test.go
Expand Up @@ -1594,7 +1594,7 @@ func TestAddStreamNoSubMgr(t *testing.T) {

func TestAddStreamOK(t *testing.T) {
assert := assert.New(t)
spec := &kldevents.StreamInfo{Type: "webhook"}
spec := &kldevents.StreamInfo{Type: "webhook", Name: "stream-1"}
b, _ := json.Marshal(spec)
req := httptest.NewRequest("POST", kldevents.StreamPathPrefix, bytes.NewReader(b))
res := httptest.NewRecorder()
Expand All @@ -1607,6 +1607,7 @@ func TestAddStreamOK(t *testing.T) {
json.NewDecoder(res.Body).Decode(&newSpec)
assert.Equal(200, res.Result().StatusCode)
assert.Equal("webhook", newSpec.Type)
assert.Equal("stream-1", newSpec.Name)
s.Shutdown()
}

Expand Down Expand Up @@ -1653,15 +1654,15 @@ func TestListStreams(t *testing.T) {

mockSubMgr := &mockSubMgr{
streams: []*kldevents.StreamInfo{
&kldevents.StreamInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
}, ID: "earlier",
}, ID: "earlier", Name: "stream-1",
},
&kldevents.StreamInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Add(1 * time.Hour).Format(time.RFC3339),
}, ID: "later",
}, ID: "later", Name: "stream-2",
},
},
}
Expand All @@ -1670,7 +1671,9 @@ func TestListStreams(t *testing.T) {
assert.Equal(200, res.Result().StatusCode)
assert.Equal(2, len(results))
assert.Equal("later", results[0].ID)
assert.Equal("stream-2", results[0].Name)
assert.Equal("earlier", results[1].ID)
assert.Equal("stream-1", results[1].Name)
}

func TestListSubs(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/kldeth/txn_test.go
Expand Up @@ -58,8 +58,8 @@ func (r *testRPCClient) CallContext(ctx context.Context, result interface{}, met
}

const (
simpleStorage = "pragma solidity >=0.4.22 <0.6.0;\n\ncontract simplestorage {\nuint public storedData;\n\nconstructor(uint initVal) public {\nstoredData = initVal;\n}\n\nfunction set(uint x) public {\nstoredData = x;\n}\n\nfunction get() public view returns (uint retVal) {\nreturn storedData;\n}\n}"
twoContracts = "pragma solidity >=0.4.22 <0.6.0;\n\ncontract contract1 {function f1() public pure returns (uint retVal) {\nreturn 1;\n}\n}\n\ncontract contract2 {function f2() public pure returns (uint retVal) {\nreturn 2;\n}\n}"
simpleStorage = "pragma solidity >=0.4.22 <0.6.9;\n\ncontract simplestorage {\nuint public storedData;\n\nconstructor(uint initVal) public {\nstoredData = initVal;\n}\n\nfunction set(uint x) public {\nstoredData = x;\n}\n\nfunction get() public view returns (uint retVal) {\nreturn storedData;\n}\n}"
twoContracts = "pragma solidity >=0.4.22 <0.6.9;\n\ncontract contract1 {function f1() public pure returns (uint retVal) {\nreturn 1;\n}\n}\n\ncontract contract2 {function f2() public pure returns (uint retVal) {\nreturn 2;\n}\n}"
)

func TestNewContractDeployTxnSimpleStorage(t *testing.T) {
Expand Down Expand Up @@ -446,7 +446,7 @@ func testComplexParam(t *testing.T, solidityType string, val interface{}, expect
assert := assert.New(t)

var msg kldmessages.DeployContract
msg.Solidity = "pragma solidity >=0.4.22 <0.6.0; contract test {constructor(" + solidityType + " p1) public {}}"
msg.Solidity = "pragma solidity >=0.4.22 <0.6.9; contract test {constructor(" + solidityType + " p1) public {}}"
msg.Parameters = []interface{}{val}
msg.From = "0xAA983AD2a0e0eD8ac639277F37be42F2A5d2618c"
msg.Nonce = "123"
Expand Down
3 changes: 2 additions & 1 deletion internal/kldevents/eventstream.go
Expand Up @@ -55,6 +55,7 @@ const (
type StreamInfo struct {
kldmessages.TimeSorted
ID string `json:"id"`
Name string `json:"name,omitempty"`
Path string `json:"path"`
Suspended bool `json:"suspended"`
Type string `json:"type,omitempty"`
Expand Down Expand Up @@ -422,7 +423,7 @@ func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*event
complete := false
for !a.suspendOrStop() && !complete {
if attempt > 0 {
log.Infof("%s: Watiting %.2fs before re-attempting batch %d", a.spec.ID, delay.Seconds(), batchNumber)
log.Infof("%s: Waiting %.2fs before re-attempting batch %d", a.spec.ID, delay.Seconds(), batchNumber)
time.Sleep(delay)
delay = time.Duration(float64(delay) * a.backoffFactor)
}
Expand Down
18 changes: 17 additions & 1 deletion internal/kldevents/eventstream_test.go
Expand Up @@ -180,6 +180,21 @@ func TestBatchSizeCap(t *testing.T) {
defer stream.stop()

assert.Equal(uint64(MaxBatchSize), stream.spec.BatchSize)
assert.Equal("", stream.spec.Name)
}

func TestStreamName(t *testing.T) {
assert := assert.New(t)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
Name: "testStream",
Webhook: &webhookAction{},
}, 200)
defer close(eventStream)
defer svr.Close()
defer stream.stop()

assert.Equal("testStream", stream.spec.Name)
}

func TestBlockingBehavior(t *testing.T) {
Expand Down Expand Up @@ -405,7 +420,8 @@ func setupTestSubscription(assert *assert.Assertions, sm *subscriptionMGR, strea
}
addr := kldbind.HexToAddress("0x167f57a13a9c35ff92f0649d2be0e52b4f8ac3ca")
ctx := context.Background()
s, _ := sm.AddSubscription(ctx, &addr, event, stream.spec.ID, "")
subscriptionName := "testSub"
s, _ := sm.AddSubscription(ctx, &addr, event, stream.spec.ID, "", subscriptionName)
return s
}

Expand Down
3 changes: 1 addition & 2 deletions internal/kldevents/logprocessor.go
Expand Up @@ -15,7 +15,6 @@
package kldevents

import (
"encoding/json"
"math/big"
"strconv"
"strings"
Expand All @@ -32,7 +31,7 @@ import (
type logEntry struct {
Address kldbind.Address `json:"address"`
BlockNumber kldbind.HexBigInt `json:"blockNumber"`
TransactionIndex json.Number `json:"transactionIndex"`
TransactionIndex kldbind.HexUint `json:"transactionIndex"`
TransactionHash kldbind.Hash `json:"transactionHash"`
Data string `json:"data"`
Topics []*kldbind.Hash `json:"topics"`
Expand Down
4 changes: 2 additions & 2 deletions internal/kldevents/submanager.go
Expand Up @@ -52,7 +52,7 @@ type SubscriptionManager interface {
SuspendStream(ctx context.Context, id string) error
ResumeStream(ctx context.Context, id string) error
DeleteStream(ctx context.Context, id string) error
AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*SubscriptionInfo, error)
AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*SubscriptionInfo, error)
Subscriptions(ctx context.Context) []*SubscriptionInfo
SubscriptionByID(ctx context.Context, id string) (*SubscriptionInfo, error)
DeleteSubscription(ctx context.Context, id string) error
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *subscriptionMGR) Subscriptions(ctx context.Context) []*SubscriptionInfo
}

// AddSubscription adds a new subscription
func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*SubscriptionInfo, error) {
func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*SubscriptionInfo, error) {
i := &SubscriptionInfo{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
Expand Down
11 changes: 6 additions & 5 deletions internal/kldevents/submanager_test.go
Expand Up @@ -107,6 +107,7 @@ func TestInitLevelDBFail(t *testing.T) {
func TestActionAndSubscriptionLifecyle(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
subscriptionName := "testSub"
defer cleanup(t, dir)
sm := newTestSubscriptionManager()
sm.rpc = kldeth.NewMockRPCClientForSync(nil, nil)
Expand All @@ -123,7 +124,7 @@ func TestActionAndSubscriptionLifecyle(t *testing.T) {
})
assert.NoError(err)

sub, err := sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "")
sub, err := sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "", subscriptionName)
assert.NoError(err)
assert.Equal(stream.ID, sub.Stream)

Expand Down Expand Up @@ -195,7 +196,7 @@ func TestActionChildCleanup(t *testing.T) {
})
assert.NoError(err)

_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "12345")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "12345", "")
err = sm.DeleteStream(ctx, stream.ID)
assert.NoError(err)

Expand Down Expand Up @@ -231,11 +232,11 @@ func TestStreamAndSubscriptionErrors(t *testing.T) {
err = sm.DeleteStream(ctx, "teststream")
assert.EqualError(err, "pop")

_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "nope", "")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "nope", "", "")
assert.EqualError(err, "Stream with ID 'nope' not found")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "", "test")
assert.EqualError(err, "Failed to store subscription: pop")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "!bad integer")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "!bad integer", "")
assert.EqualError(err, "FromBlock cannot be parsed as a BigInt")
sm.subscriptions["testsub"] = &subscription{info: &SubscriptionInfo{}, rpc: sm.rpc}
err = sm.DeleteSubscription(ctx, "nope")
Expand Down
14 changes: 10 additions & 4 deletions internal/kldevents/subscription.go
Expand Up @@ -45,7 +45,8 @@ type SubscriptionInfo struct {
kldmessages.TimeSorted
ID string `json:"id,omitempty"`
Path string `json:"path"`
Name string `json:"name"`
Summary string `json:"-"` // System generated name for the subscription
Name string `json:"name"` // User provided name for the subscription, set to Summary if missing
Stream string `json:"stream"`
Filter persistedFilter `json:"filter"`
Event kldbind.MarshalledABIEvent `json:"event"`
Expand Down Expand Up @@ -83,13 +84,18 @@ func newSubscription(sm subscriptionManager, rpc kldeth.RPCClient, addr *kldbind
addrStr = addr.String()
}
event := &i.Event.E
i.Name = addrStr + ":" + event.Sig()
i.Summary = addrStr + ":" + event.Sig()
// If a name was not provided by the end user, set it to the system generated summary
if i.Name == "" {
log.Debugf("No name provided for subscription, using auto-generated summary:%s", i.Summary)
i.Name = i.Summary
}
if event == nil || event.Name == "" {
return nil, klderrors.Errorf(klderrors.EventStreamsSubscribeNoEvent)
}
// For now we only support filtering on the event type
f.Topics = [][]kldbind.Hash{[]kldbind.Hash{event.ID()}}
log.Infof("Created subscription %s %s topic:%s", i.ID, i.Name, event.ID().String())
f.Topics = [][]kldbind.Hash{{event.ID()}}
log.Infof("Created subscription ID:%s name:%s topic:%s", i.ID, i.Name, event.ID().String())
return s, nil
}

Expand Down
14 changes: 9 additions & 5 deletions internal/kldevents/subscription_test.go
Expand Up @@ -77,15 +77,15 @@ func TestCreateWebhookSub(t *testing.T) {
Name: "glastonbury",
RawName: "glastonbury",
Inputs: []kldbind.ABIArgument{
kldbind.ABIArgument{
{
Name: "field",
Type: kldbind.ABITypeKnown("address"),
},
kldbind.ABIArgument{
{
Name: "tents",
Type: kldbind.ABITypeKnown("uint256"),
},
kldbind.ABIArgument{
{
Name: "mud",
Type: kldbind.ABITypeKnown("bool"),
},
Expand All @@ -105,6 +105,7 @@ func TestCreateWebhookSub(t *testing.T) {

assert.Equal(s.info.ID, s1.info.ID)
assert.Equal("*:glastonbury(address,uint256,bool)", s1.info.Name)
assert.Equal("*:glastonbury(address,uint256,bool)", s1.info.Summary)
assert.Equal(event.ID(), s.info.Filter.Topics[0][0])
}

Expand All @@ -120,11 +121,14 @@ func TestCreateWebhookSubWithAddr(t *testing.T) {
}

addr := kldbind.HexToAddress("0x0123456789abcDEF0123456789abCDef01234567")
s, err := newSubscription(m, rpc, &addr, testSubInfo(event))
subInfo := testSubInfo(event)
subInfo.Name = "mySubscription"
s, err := newSubscription(m, rpc, &addr, subInfo)
assert.NoError(err)
assert.NotEmpty(s.info.ID)
assert.Equal(event.ID(), s.info.Filter.Topics[0][0])
assert.Equal("0x0123456789abcDEF0123456789abCDef01234567:devcon()", s.info.Name)
assert.Equal("0x0123456789abcDEF0123456789abCDef01234567:devcon()", s.info.Summary)
assert.Equal("mySubscription", s.info.Name)
}

func TestCreateSubscriptionNoEvent(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/kldrest/webhookskafka_test.go
Expand Up @@ -374,7 +374,7 @@ func TestWebhookHandlerYAMLDeployContract(t *testing.T) {
" type: DeployContract\n" +
"from: '0x4b098809E68C88e26442491c57866b7D4852216c'\n" +
"solidity: |-\n" +
" pragma solidity >=0.4.22 <0.6.0;\n" +
" pragma solidity >=0.4.22 <0.6.9;\n" +
" \n" +
" contract simplestorage {\n" +
" uint public storedData;\n" +
Expand Down
10 changes: 5 additions & 5 deletions internal/kldtx/txnprocessor_test.go
Expand Up @@ -78,15 +78,15 @@ const testFromAddr = "0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1"

var goodDeployTxnJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"" + testFromAddr + "\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"" +
"}"

var goodHDWalletDeployTxnJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"hd-testinst-testwallet-1234\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"" +
Expand All @@ -101,7 +101,7 @@ var goodSendTxnJSON = "{" +

var goodDeployTxnPrivateJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"" + testFromAddr + "\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"," +
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestOnSendTransactionMessageBadNonce(t *testing.T) {
testTxnContext.jsonMsg = "{" +
" \"headers\":{\"type\": \"SendTransaction\"}," +
" \"from\":\"0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1\"," +
" \"nonce\":\"abc\"" +
" \"nonce\":\"123.4\"" +
"}"
txnProcessor.OnMessage(testTxnContext)
for len(testTxnContext.errorReplies) == 0 {
Expand All @@ -630,7 +630,7 @@ func TestOnSendTransactionMessageBadMsg(t *testing.T) {
" \"headers\":{\"type\": \"SendTransaction\"}," +
" \"from\":\"0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1\"," +
" \"nonce\":\"123\"," +
" \"value\":\"abc\"," +
" \"value\":\"123.456\"," +
" \"method\":{\"name\":\"test\"}" +
"}"
txnProcessor.OnMessage(testTxnContext)
Expand Down
4 changes: 2 additions & 2 deletions test/simpleevents.sol
@@ -1,4 +1,4 @@
pragma solidity >=0.5.2 <0.6.0;
pragma solidity >=0.5.2 <0.6.9;
/**
* @title Simple Storage with events
* @dev Read and write values to the chain
Expand Down Expand Up @@ -42,4 +42,4 @@ contract SimpleEvents {
return (storedI, storedS);
}

}
}

0 comments on commit 1b66f30

Please sign in to comment.