Skip to content

Commit

Permalink
ARROW-17553: [Go] Enable flight.Server to register additional grpc se…
Browse files Browse the repository at this point in the history
…rvices (#13995)

This PR exposes `grpc.ServiceRegistrar` and `reflection.ServiceInfoProvider` in `grpc.Server`. They are needed to register additional gRPC services such as health check and reflection.

Authored-by: Jie Zhang <zhangjie@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
Jie Zhang committed Aug 29, 2022
1 parent b8c04c8 commit 07e7009
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
41 changes: 41 additions & 0 deletions go/arrow/flight/flight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -308,6 +311,44 @@ func TestServer(t *testing.T) {
}
}

func TestServerWithAdditionalServices(t *testing.T) {
f := &flightServer{}
f.SetAuthHandler(&servAuth{})

s := flight.NewFlightServer()
s.Init("localhost:0")
s.RegisterFlightService(f)

// Enable health check.
grpc_health_v1.RegisterHealthServer(s, health.NewServer())

// Enable reflection for grpcurl.
reflection.Register(s)

go s.Serve()
defer s.Shutdown()

// Flight client should not be affected by the additional services.
flightClient, err := flight.NewFlightClient(s.Addr().String(), &clientAuth{}, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Error(err)
}
defer flightClient.Close()

// Make sure health check is working.
conn, err := grpc.Dial(s.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Error(err)
}
defer conn.Close()

healthClient := grpc_health_v1.NewHealthClient(conn)
_, err = healthClient.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
if err != nil {
t.Error(err)
}
}

type flightMetadataWriterServer struct {
flight.BaseFlightServer
}
Expand Down
17 changes: 16 additions & 1 deletion go/arrow/flight/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package flight

import (
context "context"
"context"
"net"
"os"
"os/signal"

"github.com/apache/arrow/go/v10/arrow/flight/internal/flight"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

type (
Expand Down Expand Up @@ -79,6 +80,12 @@ type Server interface {
// RegisterFlightService sets up the handler for the Flight Endpoints as per
// normal Grpc setups
RegisterFlightService(FlightServer)
// ServiceRegistrar wraps a single method that supports service registration.
// For example, it may be used to register health check provided by grpc-go.
grpc.ServiceRegistrar
// ServiceInfoProvider is an interface used to retrieve metadata about the services to expose.
// If reflection is enabled on the server, all the endpoints can be invoked using grpcurl.
reflection.ServiceInfoProvider
}

// BaseFlightServer is the base flight server implementation and must be
Expand Down Expand Up @@ -250,3 +257,11 @@ func (s *server) RegisterFlightService(svc FlightServer) {
func (s *server) Shutdown() {
s.server.GracefulStop()
}

func (s *server) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
s.server.RegisterService(sd, ss)
}

func (s *server) GetServiceInfo() map[string]grpc.ServiceInfo {
return s.server.GetServiceInfo()
}

0 comments on commit 07e7009

Please sign in to comment.