Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go kosu/rpc #174

Merged
merged 6 commits into from Jul 23, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

go-kosu: subscriptio and call PoC

  • Loading branch information
gchaincl committed Jul 18, 2019
commit 973177b4d1ed99df8114632078ea471ce77e0e6e
@@ -0,0 +1,47 @@
package rpc

import (
"context"

"github.com/ethereum/go-ethereum/rpc"
)

type Client struct {
rpc *rpc.Client
}

func DialInProc(srv *rpc.Server) *Client {
return &Client{
rpc: rpc.DialInProc(srv),
}
}

func (c *Client) Subscribe(ctx context.Context, fn func(interface{}), query string) error {
ch := make(chan interface{})
args := []interface{}{"subscribe", query}
sub, err := c.rpc.Subscribe(ctx, "kosu", ch, args...)
if err != nil {
return err
}

go func() {
defer close(ch)
defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
return
case <-sub.Err():
return
case i := <-ch:
fn(i)
}
}
}()
return nil
}

func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
return c.rpc.Call(result, method, args...)
}
@@ -0,0 +1,13 @@
package rpc

import (
"go-kosu/abci"

"github.com/ethereum/go-ethereum/rpc"
)

func NewServer(abci *abci.Client) *rpc.Server {
srv := rpc.NewServer()
srv.RegisterName("kosu", &Service{abci: abci})
return srv
}
@@ -0,0 +1,65 @@
package rpc

import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"go-kosu/abci"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
)

func TestRPCCall(t *testing.T) {
server := NewServer(nil)
client := DialInProc(server)
require.NoError(t, client.Call(nil, "kosu_foo"))
}

func TestRPCSubscription(t *testing.T) {
_, closer := startServer(t, db.NewMemDB())
defer closer()
abciClient := abci.NewHTTPClient("http://localhost:26657", nil)

server := NewServer(abciClient)
client := DialInProc(server)

fn := func(i interface{}) {
fmt.Printf("i = %+v\n", i)
}
ctx, cancel := context.WithCancel(context.Background())

err := client.Subscribe(ctx, fn, "tm.event = 'NewBlock'")
require.NoError(t, err)

time.Sleep(3 * time.Second)
cancel()
}

// TODO use tests/support.go version of startServer
func startServer(t *testing.T, db db.DB) (*abci.App, func()) {
// Create a temp dir and initialize tendermint there
dir, err := ioutil.TempDir("/tmp", "/go-kosu-go-tests_")
require.NoError(t, err)

err = abci.InitTendermintWithLogger(dir, log.NewNopLogger())
require.NoError(t, err)

// Initialize the server
app := abci.NewApp(db, dir)
app.Config.LogFormat = "none"
app.Config.LogLevel = "app:error"
srv, err := abci.StartInProcessServer(app)
require.NoError(t, err)

// nolint
return app, func() {
srv.Stop()
os.RemoveAll(dir)
}
}

This file was deleted.

@@ -0,0 +1,57 @@
package rpc

import (
"context"
"go-kosu/abci"

"github.com/ethereum/go-ethereum/rpc"
)

// Service is a RPC service
type Service struct {
abci *abci.Client
}

// NewService returns a new service given a abci client
func NewService(abci *abci.Client) *Service {
return &Service{
abci: abci,
}
}

// Subscribe subscribes to the ABCI events
// To tell which events you want, you need to provide a query.
// More information about query can be found here: https://tendermint.com/rpc/#subscribe
func (s *Service) Subscribe(ctx context.Context, query string) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}

events, closer, err := s.abci.Subscribe(ctx, query)
if err != nil {
return nil, err
}

rpcSub := notifier.CreateSubscription()
go func() {
defer closer()

for {
select {
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
case e := <-events:
notifier.Notify(rpcSub.ID, e)
}
}
}()

return rpcSub, nil
}

func (s *Service) Foo() error {
return nil
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.