Skip to content

Commit

Permalink
Start WalletService from WalletLoaderService.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrick committed Jan 25, 2016
1 parent 232fbc2 commit 30ee424
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
19 changes: 3 additions & 16 deletions btcwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,6 @@ func walletMain() error {
}
}()
loader.RunAfterLoad(func(w *wallet.Wallet, db walletdb.DB) {
// TODO: If main.cfg.NoInitialLoad is set, the rpc client will
// have to be created elsewhere (right now, this is only done
// elsewhere by the WalletLoaderService, but other systems may
// share the responsibility later). Once both this happens and
// the wallet is loaded, the WalletService gRPC service must
// also be started. This cannot currently be performed by the
// code in the main package as it has no access to that client
// nor is it signaled when it is created. If the service is
// started there, this information must be made available. It
// also can not be done from the rpcserver package as that
// package only contains implementations of the services
// themselves and not a server that it can register another
// service with.

if legacyRPCServer != nil {
legacyRPCServer.RegisterWallet(w)
}
Expand All @@ -134,14 +120,15 @@ func walletMain() error {
// Shutdown the server(s) when interrupt signal is received.
if rpcs != nil {
addInterruptHandler(func() {
// TODO: Does this need to wait for the grpc server to
// finish up any requests?
log.Warn("Stopping RPC server...")
rpcs.Stop()
log.Info("RPC server shutdown")
})
}
if legacyRPCServer != nil {
// The legacy RPC server is able to request process shutdown by
// means of the 'stop' RPC. If this is triggered, execute
// shutdown as it would happen due to an interrupt.
go func() {
<-legacyRPCServer.RequestProcessShutdown()
simulateInterrupt()
Expand Down
51 changes: 39 additions & 12 deletions rpc/rpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func errorCode(err error) codes.Code {
type walletServer struct {
wallet *wallet.Wallet

// This will be replaced with a more generic syncing service in future commits.
// This will be replaced with a more generic syncing service in future
// commits. The client initially may be initialized with nil when the
// service is instantiated and the service may be associated with a
// client at a later time.
rpcClient *chain.RPCClient
rpcClientMu sync.Mutex
}
Expand All @@ -113,11 +116,17 @@ type walletServer struct {
// underlying RPC client used for network services and synchronization.
type WalletServer walletServer

func (s *walletServer) RPCClient() *chain.RPCClient {
// RequireRPCClient returns the service's associated RPC client if it exists,
// and an error if no client exists yet.
func (s *walletServer) RequireRPCClient() (*chain.RPCClient, error) {
s.rpcClientMu.Lock()
c := s.rpcClient
s.rpcClientMu.Unlock()
return c
if c == nil {
return nil, grpc.Errorf(codes.FailedPrecondition,
"The wallet service is not associated with a consensus RPC server")
}
return c, nil
}

// SetRPCClient associates the server with a new RPC client. This does not
Expand All @@ -132,15 +141,19 @@ func (s *WalletServer) SetRPCClient(c *chain.RPCClient) {
// loaderServer provides RPC clients with the ability to load and close wallets,
// as well as establishing a RPC connection to a btcd consensus server.
type loaderServer struct {
loader *wallet.Loader
activeNet *netparams.Params
rpcClient *chain.RPCClient
mu sync.Mutex
loader *wallet.Loader
activeNet *netparams.Params
server *grpc.Server // Used to start wallet service.
walletServer *WalletServer // Needed to associate RPC client.
rpcClient *chain.RPCClient
mu sync.Mutex
}

// RegisterWalletService creates a implementation of the WalletService and
// registers it with the gRPC server. This may only be called once during
// program lifetime.
// program lifetime. The RPC client is optional, but unless SetRPCClient is
// used to associate the service with an RPC client later, not all methods will
// be available and will error with FailedPrecondition.
func RegisterWalletService(server *grpc.Server, wallet *wallet.Wallet, rpcClient *chain.RPCClient) *WalletServer {
service := &walletServer{wallet: wallet, rpcClient: rpcClient}
pb.RegisterWalletServiceServer(server, service)
Expand Down Expand Up @@ -538,14 +551,19 @@ func (s *walletServer) SignTransaction(ctx context.Context, req *pb.SignTransact
func (s *walletServer) PublishTransaction(ctx context.Context, req *pb.PublishTransactionRequest) (
*pb.PublishTransactionResponse, error) {

rpcClient, err := s.RequireRPCClient()
if err != nil {
return nil, err
}

var msgTx wire.MsgTx
err := msgTx.Deserialize(bytes.NewReader(req.SignedTransaction))
err = msgTx.Deserialize(bytes.NewReader(req.SignedTransaction))
if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument,
"Bytes do not represent a valid raw transaction: %v", err)
}

_, err = s.RPCClient().SendRawTransaction(&msgTx, false)
_, err = rpcClient.SendRawTransaction(&msgTx, false)
if err != nil {
return nil, translateError(err)
}
Expand Down Expand Up @@ -741,7 +759,7 @@ func (s *walletServer) AccountNotifications(req *pb.AccountNotificationsRequest,
func RegisterWalletLoaderService(server *grpc.Server, loader *wallet.Loader,
activeNet *netparams.Params) {

service := &loaderServer{loader: loader, activeNet: activeNet}
service := &loaderServer{loader: loader, activeNet: activeNet, server: server}
pb.RegisterWalletLoaderServiceServer(server, service)
}

Expand All @@ -768,6 +786,8 @@ func (s *loaderServer) CreateWallet(ctx context.Context, req *pb.CreateWalletReq
if s.rpcClient != nil {
wallet.SynchronizeRPC(s.rpcClient)
}
// nil rpc client is ok
s.walletServer = RegisterWalletService(s.server, wallet, s.rpcClient)
s.mu.Unlock()

return &pb.CreateWalletResponse{}, nil
Expand All @@ -791,6 +811,8 @@ func (s *loaderServer) OpenWallet(ctx context.Context, req *pb.OpenWalletRequest
if s.rpcClient != nil {
wallet.SynchronizeRPC(s.rpcClient)
}
// nil rpc client is ok
s.walletServer = RegisterWalletService(s.server, wallet, s.rpcClient)
s.mu.Unlock()

return &pb.OpenWalletResponse{}, nil
Expand Down Expand Up @@ -843,7 +865,7 @@ func (s *loaderServer) StartBtcdRpc(ctx context.Context, req *pb.StartBtcdRpcReq
wallet, walletLoaded := s.loader.LoadedWallet()
if walletLoaded && wallet.SynchronizingToNetwork() {
return nil, grpc.Errorf(codes.FailedPrecondition,
"wallet is loaded and already synchronizing")
"Wallet is loaded and already synchronizing")
}

rpcClient, err := chain.NewRPCClient(s.activeNet.Params, networkAddress, req.Username,
Expand All @@ -868,6 +890,11 @@ func (s *loaderServer) StartBtcdRpc(ctx context.Context, req *pb.StartBtcdRpcReq
if walletLoaded {
wallet.SynchronizeRPC(rpcClient)
}
if s.walletServer == nil {
s.walletServer = RegisterWalletService(s.server, wallet, rpcClient)
} else {
s.walletServer.SetRPCClient(rpcClient)
}

return &pb.StartBtcdRpcResponse{}, nil
}

0 comments on commit 30ee424

Please sign in to comment.