Skip to content

Commit

Permalink
feat: endpoints for gRPC watch module for LocalTxMonitor
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
  • Loading branch information
wolf31o2 committed May 8, 2024
1 parent 81b5a74 commit 388001a
Showing 1 changed file with 148 additions and 0 deletions.
148 changes: 148 additions & 0 deletions internal/utxorpc/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utxorpc

import (
"context"
"fmt"
"log"

connect "connectrpc.com/connect"
"github.com/blinklabs-io/gouroboros/ledger"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
"github.com/blinklabs-io/snek/event"
input_chainsync "github.com/blinklabs-io/snek/input/chainsync"
watch "github.com/utxorpc/go-codegen/utxorpc/v1alpha/watch"
"github.com/utxorpc/go-codegen/utxorpc/v1alpha/watch/watchconnect"

"github.com/blinklabs-io/cardano-node-api/internal/node"
)

// watchServiceServer implements the WatchService API
type watchServiceServer struct {
watchconnect.UnimplementedWatchServiceHandler
}

// WatchTx
func (s *watchServiceServer) WatchTx(
ctx context.Context,
req *connect.Request[watch.WatchTxRequest],
stream *connect.ServerStream[watch.WatchTxResponse],
) error {
predicate := req.Msg.GetPredicate() // Predicate
fieldMask := req.Msg.GetFieldMask()
log.Printf("Got a WatchTx request with predicate %v and fieldMask %v", predicate, fieldMask)

// Setup event channel
eventChan := make(chan event.Event, 10)
connCfg := node.ConnectionConfig{
ChainSyncEventChan: eventChan,
}
// Connect to node
oConn, err := node.GetConnection(&connCfg)
if err != nil {
log.Printf("ERROR: %w", err)
return err
}
defer func() {
// Close Ouroboros connection
oConn.Close()
}()

// Get current tip
tip, err := oConn.ChainSync().Client.GetCurrentTip()
if err != nil {
log.Printf("ERROR: %w", err)
return err
}
// Start the sync with the node
err = oConn.ChainSync().Client.Sync([]ocommon.Point{tip.Point})

// Wait for events
for {
evt, ok := <-eventChan
if !ok {
log.Printf("ERROR: channel closed")
return fmt.Errorf("ERROR: channel closed")
}

switch evt.Type {
case "chainsync.block":
// Get event context to get the block chain information
context := evt.Context
if context == nil {
log.Printf("ERROR: empty block context")
return fmt.Errorf("ERROR: empty block context")
}
bc := context.(input_chainsync.BlockContext)
// Get event payload to get the block data
payload := evt.Payload
if payload == nil {
log.Printf(
"ERROR: empty payload: block: %d, slot: %d",
bc.BlockNumber,
bc.SlotNumber,
)
return fmt.Errorf(
"ERROR: empty payload: block: %d, slot: %d",
bc.BlockNumber,
bc.SlotNumber,
)
}
be := payload.(input_chainsync.BlockEvent)
// Extract the block bytes
blockBytes := []byte(be.BlockCbor)
blockType, err := ledger.DetermineBlockType(blockBytes)
if err != nil {
return fmt.Errorf(
"ERROR: failed to determine block type: block: %d, slot: %d",
bc.BlockNumber,
bc.SlotNumber,
)
}
// Get a parsed gOuroboros Block
block, err := ledger.NewBlockFromCbor(blockType, blockBytes)
if err != nil {
return fmt.Errorf(
"ERROR: failed to parse block: block: %d, slot: %d",
bc.BlockNumber,
bc.SlotNumber,
)
}

// Loop through transactions
for _, tx := range block.Transactions() {
resp := &watch.WatchTxResponse{}
var act watch.AnyChainTx
var actc watch.AnyChainTx_Cardano
cTx := tx.Utxorpc() // *cardano.Tx
actc.Cardano = cTx
act.Chain = &actc
var wtra watch.WatchTxResponse_Apply
wtra.Apply = &act
resp.Action = &wtra
if predicate == nil {
stream.Send(resp)
} else {
// TODO: filter from all Predicate types
stream.Send(resp)
}
}
}

// TODO: process events
log.Printf("event: %v", evt)
}
}

0 comments on commit 388001a

Please sign in to comment.