From 67c653bca86d2972f397cabe95fca90ca6eca778 Mon Sep 17 00:00:00 2001 From: jackstar12 <62219658+jackstar12@users.noreply.github.com> Date: Mon, 25 Mar 2024 14:58:16 +0100 Subject: [PATCH] feat: global swap info stream (#124) * feat: global swap info stream * fix: place test db in tempdir --- boltzrpc/boltzrpc.proto | 1 + boltzrpc/boltzrpc_grpc.pb.go | 2 ++ cmd/boltzd/boltzd_test.go | 48 +++++++++++++++++++++++++++++++++--- docs/grpc.md | 2 +- nursery/nursery.go | 11 +++++++++ rpcserver/router.go | 38 +++++++++++++++++----------- 6 files changed, 82 insertions(+), 20 deletions(-) diff --git a/boltzrpc/boltzrpc.proto b/boltzrpc/boltzrpc.proto index af3de419..9f121c98 100644 --- a/boltzrpc/boltzrpc.proto +++ b/boltzrpc/boltzrpc.proto @@ -51,6 +51,7 @@ service Boltz { /* Returns the entire history of the swap if is still pending and streams updates in real time. + If the swap id is empty or "*" updates for all swaps will be streamed. */ rpc GetSwapInfoStream (GetSwapInfoRequest) returns (stream GetSwapInfoResponse); diff --git a/boltzrpc/boltzrpc_grpc.pb.go b/boltzrpc/boltzrpc_grpc.pb.go index 02b17e3f..3c7a5a74 100644 --- a/boltzrpc/boltzrpc_grpc.pb.go +++ b/boltzrpc/boltzrpc_grpc.pb.go @@ -72,6 +72,7 @@ type BoltzClient interface { // Gets all available information about a swap from the database. GetSwapInfo(ctx context.Context, in *GetSwapInfoRequest, opts ...grpc.CallOption) (*GetSwapInfoResponse, error) // Returns the entire history of the swap if is still pending and streams updates in real time. + // If the swap id is empty or "*" updates for all swaps will be streamed. GetSwapInfoStream(ctx context.Context, in *GetSwapInfoRequest, opts ...grpc.CallOption) (Boltz_GetSwapInfoStreamClient, error) // Deprecated: Do not use. // @@ -400,6 +401,7 @@ type BoltzServer interface { // Gets all available information about a swap from the database. GetSwapInfo(context.Context, *GetSwapInfoRequest) (*GetSwapInfoResponse, error) // Returns the entire history of the swap if is still pending and streams updates in real time. + // If the swap id is empty or "*" updates for all swaps will be streamed. GetSwapInfoStream(*GetSwapInfoRequest, Boltz_GetSwapInfoStreamServer) error // Deprecated: Do not use. // diff --git a/cmd/boltzd/boltzd_test.go b/cmd/boltzd/boltzd_test.go index bdbf7030..5082a6df 100644 --- a/cmd/boltzd/boltzd_test.go +++ b/cmd/boltzd/boltzd_test.go @@ -47,7 +47,7 @@ func loadConfig(t *testing.T) *config.Config { dataDir := "test" cfg, err := config.LoadConfig(dataDir) require.NoError(t, err) - cfg.Database.Path = "file:test.db?cache=shared&mode=memory" + cfg.Database.Path = t.TempDir() + "/boltz.db" cfg.Node = "cln" cfg.Node = "lnd" return cfg @@ -220,6 +220,46 @@ func TestGetInfo(t *testing.T) { } } +func TestGetSwapInfoStream(t *testing.T) { + client, _, stop := setup(t, nil, "") + defer stop() + + stream, err := client.GetSwapInfoStream("") + require.NoError(t, err) + + updates := make(chan *boltzrpc.GetSwapInfoResponse) + go func() { + for { + status, err := stream.Recv() + if err != nil { + close(updates) + return + } + updates <- status + } + }() + + swap, err := client.CreateSwap(&boltzrpc.CreateSwapRequest{}) + require.NoError(t, err) + + select { + case info := <-updates: + require.Equal(t, swap.Id, info.Swap.Id) + case <-time.After(2 * time.Second): + require.Fail(t, "no swap update received") + } + + reverseSwap, err := client.CreateReverseSwap(&boltzrpc.CreateReverseSwapRequest{Amount: 100000}) + require.NoError(t, err) + + select { + case info := <-updates: + require.Equal(t, reverseSwap.Id, info.ReverseSwap.Id) + case <-time.After(2 * time.Second): + require.Fail(t, "no reverse swap update received") + } +} + func TestGetPairs(t *testing.T) { cfg := loadConfig(t) client, _, stop := setup(t, cfg, "") @@ -378,12 +418,12 @@ func TestSwap(t *testing.T) { SendFromInternal: true, }) require.NoError(t, err) - info, err := client.GetSwapInfo(swap.Id) - require.NoError(t, err) + + stream := swapStream(t, client, swap.Id) + info := stream(boltzrpc.SwapState_PENDING) require.Equal(t, invoice.PaymentRequest, info.Swap.Invoice) test.MineBlock() - stream := swapStream(t, client, swap.Id) stream(boltzrpc.SwapState_SUCCESSFUL) paid, err := node.CheckInvoicePaid(invoice.PaymentHash) diff --git a/docs/grpc.md b/docs/grpc.md index 877c3db6..e57e91cd 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -79,7 +79,7 @@ Gets all available information about a swap from the database. #### GetSwapInfoStream -Returns the entire history of the swap if is still pending and streams updates in real time. +Returns the entire history of the swap if is still pending and streams updates in real time. If the swap id is empty or "*" updates for all swaps will be streamed. | Request | Response | | ------- | -------- | diff --git a/nursery/nursery.go b/nursery/nursery.go index 51cb5a4d..257a5282 100644 --- a/nursery/nursery.go +++ b/nursery/nursery.go @@ -28,6 +28,7 @@ type Nursery struct { eventListeners map[string]swapListener eventListenersLock sync.RWMutex + globalListener swapListener waitGroup sync.WaitGroup stop *utils.ChannelForwarder[bool] @@ -45,6 +46,7 @@ type SwapUpdate struct { type swapListener = *utils.ChannelForwarder[SwapUpdate] func (nursery *Nursery) sendUpdate(id string, update SwapUpdate) { + nursery.globalListener.Send(update) if listener, ok := nursery.eventListeners[id]; ok { listener.Send(update) logger.Debugf("Sent update for swap %s", id) @@ -69,6 +71,13 @@ func (nursery *Nursery) SwapUpdates(id string) (<-chan SwapUpdate, func()) { return nil, nil } +func (nursery *Nursery) GlobalSwapUpdates() (<-chan SwapUpdate, func()) { + updates := nursery.globalListener.Get() + return updates, func() { + nursery.globalListener.Remove(updates) + } +} + func (nursery *Nursery) Init( network *boltz.Network, lightning lightning.LightningNode, @@ -82,6 +91,7 @@ func (nursery *Nursery) Init( nursery.database = database nursery.onchain = chain nursery.eventListeners = make(map[string]swapListener) + nursery.globalListener = utils.ForwardChannel(make(chan SwapUpdate), 0, false) nursery.stop = utils.ForwardChannel(make(chan bool), 0, false) nursery.boltzWs = boltz.NewBoltzWebsocket(boltzClient.URL) @@ -106,6 +116,7 @@ func (nursery *Nursery) Stop() { for id := range nursery.eventListeners { nursery.removeSwapListener(id) } + nursery.globalListener.Close() logger.Debugf("Closed all event listeners") nursery.boltzWs.Close() nursery.waitGroup.Wait() diff --git a/rpcserver/router.go b/rpcserver/router.go index 8970e8ca..5f6e72fc 100644 --- a/rpcserver/router.go +++ b/rpcserver/router.go @@ -227,25 +227,33 @@ func (server *routedBoltzServer) GetSwapInfo(_ context.Context, request *boltzrp } func (server *routedBoltzServer) GetSwapInfoStream(request *boltzrpc.GetSwapInfoRequest, stream boltzrpc.Boltz_GetSwapInfoStreamServer) error { - logger.Info("Starting Swap info stream for " + request.Id) - info, err := server.GetSwapInfo(context.Background(), request) - if err != nil { - return handleError(err) - } + var updates <-chan nursery.SwapUpdate + var stop func() - updates, stop := server.nursery.SwapUpdates(request.Id) - if updates != nil { - for update := range updates { - if err := stream.Send(&boltzrpc.GetSwapInfoResponse{ - Swap: serializeSwap(update.Swap), - ReverseSwap: serializeReverseSwap(update.ReverseSwap), - }); err != nil { - stop() + if request.Id == "" || request.Id == "*" { + logger.Info("Starting global Swap info stream") + updates, stop = server.nursery.GlobalSwapUpdates() + } else { + logger.Info("Starting Swap info stream for " + request.Id) + updates, stop = server.nursery.SwapUpdates(request.Id) + if updates == nil { + info, err := server.GetSwapInfo(context.Background(), request) + if err != nil { return handleError(err) } + if err := stream.Send(info); err != nil { + return handleError(err) + } + return nil } - } else { - if err := stream.Send(info); err != nil { + } + + for update := range updates { + if err := stream.Send(&boltzrpc.GetSwapInfoResponse{ + Swap: serializeSwap(update.Swap), + ReverseSwap: serializeReverseSwap(update.ReverseSwap), + }); err != nil { + stop() return handleError(err) } }