Skip to content

Commit

Permalink
feat: global swap info stream (#124)
Browse files Browse the repository at this point in the history
* feat: global swap info stream

* fix: place test db in tempdir
  • Loading branch information
jackstar12 committed Mar 25, 2024
1 parent d23447c commit 67c653b
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 20 deletions.
1 change: 1 addition & 0 deletions boltzrpc/boltzrpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ service Boltz {

/*
Returns the entire history of the swap if is still pending and streams updates in real time.
If the swap id is empty or "*" updates for all swaps will be streamed.
*/
rpc GetSwapInfoStream (GetSwapInfoRequest) returns (stream GetSwapInfoResponse);

Expand Down
2 changes: 2 additions & 0 deletions boltzrpc/boltzrpc_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 44 additions & 4 deletions cmd/boltzd/boltzd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func loadConfig(t *testing.T) *config.Config {
dataDir := "test"
cfg, err := config.LoadConfig(dataDir)
require.NoError(t, err)
cfg.Database.Path = "file:test.db?cache=shared&mode=memory"
cfg.Database.Path = t.TempDir() + "/boltz.db"
cfg.Node = "cln"
cfg.Node = "lnd"
return cfg
Expand Down Expand Up @@ -220,6 +220,46 @@ func TestGetInfo(t *testing.T) {
}
}

func TestGetSwapInfoStream(t *testing.T) {
client, _, stop := setup(t, nil, "")
defer stop()

stream, err := client.GetSwapInfoStream("")
require.NoError(t, err)

updates := make(chan *boltzrpc.GetSwapInfoResponse)
go func() {
for {
status, err := stream.Recv()
if err != nil {
close(updates)
return
}
updates <- status
}
}()

swap, err := client.CreateSwap(&boltzrpc.CreateSwapRequest{})
require.NoError(t, err)

select {
case info := <-updates:
require.Equal(t, swap.Id, info.Swap.Id)
case <-time.After(2 * time.Second):
require.Fail(t, "no swap update received")
}

reverseSwap, err := client.CreateReverseSwap(&boltzrpc.CreateReverseSwapRequest{Amount: 100000})
require.NoError(t, err)

select {
case info := <-updates:
require.Equal(t, reverseSwap.Id, info.ReverseSwap.Id)
case <-time.After(2 * time.Second):
require.Fail(t, "no reverse swap update received")
}
}

func TestGetPairs(t *testing.T) {
cfg := loadConfig(t)
client, _, stop := setup(t, cfg, "")
Expand Down Expand Up @@ -378,12 +418,12 @@ func TestSwap(t *testing.T) {
SendFromInternal: true,
})
require.NoError(t, err)
info, err := client.GetSwapInfo(swap.Id)
require.NoError(t, err)

stream := swapStream(t, client, swap.Id)
info := stream(boltzrpc.SwapState_PENDING)
require.Equal(t, invoice.PaymentRequest, info.Swap.Invoice)

test.MineBlock()
stream := swapStream(t, client, swap.Id)
stream(boltzrpc.SwapState_SUCCESSFUL)

paid, err := node.CheckInvoicePaid(invoice.PaymentHash)
Expand Down
2 changes: 1 addition & 1 deletion docs/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Gets all available information about a swap from the database.

#### GetSwapInfoStream

Returns the entire history of the swap if is still pending and streams updates in real time.
Returns the entire history of the swap if is still pending and streams updates in real time. If the swap id is empty or "*" updates for all swaps will be streamed.

| Request | Response |
| ------- | -------- |
Expand Down
11 changes: 11 additions & 0 deletions nursery/nursery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Nursery struct {

eventListeners map[string]swapListener
eventListenersLock sync.RWMutex
globalListener swapListener
waitGroup sync.WaitGroup
stop *utils.ChannelForwarder[bool]

Expand All @@ -45,6 +46,7 @@ type SwapUpdate struct {
type swapListener = *utils.ChannelForwarder[SwapUpdate]

func (nursery *Nursery) sendUpdate(id string, update SwapUpdate) {
nursery.globalListener.Send(update)
if listener, ok := nursery.eventListeners[id]; ok {
listener.Send(update)
logger.Debugf("Sent update for swap %s", id)
Expand All @@ -69,6 +71,13 @@ func (nursery *Nursery) SwapUpdates(id string) (<-chan SwapUpdate, func()) {
return nil, nil
}

func (nursery *Nursery) GlobalSwapUpdates() (<-chan SwapUpdate, func()) {
updates := nursery.globalListener.Get()
return updates, func() {
nursery.globalListener.Remove(updates)
}
}

func (nursery *Nursery) Init(
network *boltz.Network,
lightning lightning.LightningNode,
Expand All @@ -82,6 +91,7 @@ func (nursery *Nursery) Init(
nursery.database = database
nursery.onchain = chain
nursery.eventListeners = make(map[string]swapListener)
nursery.globalListener = utils.ForwardChannel(make(chan SwapUpdate), 0, false)
nursery.stop = utils.ForwardChannel(make(chan bool), 0, false)
nursery.boltzWs = boltz.NewBoltzWebsocket(boltzClient.URL)

Expand All @@ -106,6 +116,7 @@ func (nursery *Nursery) Stop() {
for id := range nursery.eventListeners {
nursery.removeSwapListener(id)
}
nursery.globalListener.Close()
logger.Debugf("Closed all event listeners")
nursery.boltzWs.Close()
nursery.waitGroup.Wait()
Expand Down
38 changes: 23 additions & 15 deletions rpcserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,33 @@ func (server *routedBoltzServer) GetSwapInfo(_ context.Context, request *boltzrp
}

func (server *routedBoltzServer) GetSwapInfoStream(request *boltzrpc.GetSwapInfoRequest, stream boltzrpc.Boltz_GetSwapInfoStreamServer) error {
logger.Info("Starting Swap info stream for " + request.Id)
info, err := server.GetSwapInfo(context.Background(), request)
if err != nil {
return handleError(err)
}
var updates <-chan nursery.SwapUpdate
var stop func()

updates, stop := server.nursery.SwapUpdates(request.Id)
if updates != nil {
for update := range updates {
if err := stream.Send(&boltzrpc.GetSwapInfoResponse{
Swap: serializeSwap(update.Swap),
ReverseSwap: serializeReverseSwap(update.ReverseSwap),
}); err != nil {
stop()
if request.Id == "" || request.Id == "*" {
logger.Info("Starting global Swap info stream")
updates, stop = server.nursery.GlobalSwapUpdates()
} else {
logger.Info("Starting Swap info stream for " + request.Id)
updates, stop = server.nursery.SwapUpdates(request.Id)
if updates == nil {
info, err := server.GetSwapInfo(context.Background(), request)
if err != nil {
return handleError(err)
}
if err := stream.Send(info); err != nil {
return handleError(err)
}
return nil
}
} else {
if err := stream.Send(info); err != nil {
}

for update := range updates {
if err := stream.Send(&boltzrpc.GetSwapInfoResponse{
Swap: serializeSwap(update.Swap),
ReverseSwap: serializeReverseSwap(update.ReverseSwap),
}); err != nil {
stop()
return handleError(err)
}
}
Expand Down

0 comments on commit 67c653b

Please sign in to comment.