Permalink
Browse files

Use Loopback Connection in Sequencer (#1137)

* Use loopback in sequencer

* Listen on port before creating grpc server.

In order to pass a sequencer client to the sequencer server,
we need to grpc.Dial *before* the server is running.
This works because grpc.Dial is nonblocking by default.
  • Loading branch information...
gdbelvin committed Nov 29, 2018
1 parent e55394f commit 3c6f62f488cd2ba27db4e3fb545af15f25563397
@@ -18,30 +18,46 @@ import (
"context"
"database/sql"
"flag"
"net"
"time"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/google/trillian"
"github.com/google/trillian/crypto/keys/der"
"github.com/google/trillian/crypto/keyspb"
"github.com/google/trillian/monitoring/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
"github.com/google/keytransparency/core/adminserver"
"github.com/google/keytransparency/core/sequencer"
"github.com/google/keytransparency/impl/sql/directory"
"github.com/google/keytransparency/impl/sql/engine"
"github.com/google/keytransparency/impl/sql/mutationstorage"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/google/trillian"
"google.golang.org/grpc"
pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
spb "github.com/google/keytransparency/core/sequencer/sequencer_go_proto"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/google/trillian/crypto/keys/der"
"github.com/google/trillian/crypto/keyspb"
"github.com/google/trillian/monitoring/prometheus"
_ "github.com/google/trillian/crypto/keys/der/proto"
_ "github.com/google/trillian/merkle/coniks" // Register hasher
_ "github.com/google/trillian/merkle/rfc6962" // Register hasher
)
var (
keyFile = flag.String("tls-key", "genfiles/server.key", "TLS private key file")
certFile = flag.String("tls-cert", "genfiles/server.crt", "TLS cert file")
listenAddr = flag.String("addr", ":8080", "The ip:port to serve on")
metricsAddr = flag.String("metrics-addr", ":8081", "The ip:port to publish metrics on")
serverDBPath = flag.String("db", "db", "Database connection string")
// Info to connect to the trillian map and log.
mapURL = flag.String("map-url", "", "URL of Trillian Map Server")
logURL = flag.String("log-url", "", "URL of Trillian Log Server for Signed Map Heads")
mapURL = flag.String("map-url", "", "URL of Trillian Map Server")
logURL = flag.String("log-url", "", "URL of Trillian Log Server for Signed Map Heads")
refresh = flag.Duration("directory-refresh", 5*time.Second, "Time to detect new directory")
batchSize = flag.Int("batch-size", 100, "Maximum number of mutations to process per map revision")
)
@@ -70,10 +86,6 @@ func main() {
if err != nil {
glog.Exitf("Failed to connect to %v: %v", *logURL, err)
}
tlog := trillian.NewTrillianLogClient(lconn)
tmap := trillian.NewTrillianMapClient(mconn)
logAdmin := trillian.NewTrillianAdminClient(lconn)
mapAdmin := trillian.NewTrillianAdminClient(mconn)
// Database tables
sqldb := openDB()
@@ -88,43 +100,77 @@ func main() {
glog.Exitf("Failed to create directory storage object: %v", err)
}
// Create server
sequencerServer := sequencer.NewServer(
directoryStorage,
logAdmin, mapAdmin,
tlog, tmap,
mutations, mutations,
prometheus.MetricFactory{},
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
glog.Exitf("Failed to load server credentials %v", err)
}
grpcServer := grpc.NewServer(
grpc.Creds(creds),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
sequencerClient, stop, err := sequencer.RunAndConnect(ctx, sequencerServer)
// Listen and create empty grpc client connection.
lis, err := net.Listen("tcp", *listenAddr)
if err != nil {
glog.Exitf("error creating TCP listener: %v", err)
}
addr := lis.Addr().String()
// Non-blocking dial before we start the server.
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure())
if err != nil {
glog.Errorf("error launching sequencer server: %v", err)
glog.Exitf("error connecting to %v: %v", addr, err)
}
defer stop()
defer conn.Close()
signer := sequencer.New(
sequencerClient,
mapAdmin,
spb.RegisterKeyTransparencySequencerServer(grpcServer, sequencer.NewServer(
directoryStorage,
int32(*batchSize))
trillian.NewTrillianAdminClient(lconn),
trillian.NewTrillianAdminClient(mconn),
trillian.NewTrillianLogClient(lconn),
trillian.NewTrillianMapClient(mconn),
mutations, mutations,
spb.NewKeyTransparencySequencerClient(conn),
prometheus.MetricFactory{}))
pb.RegisterKeyTransparencyAdminServer(grpcServer, adminserver.New(
trillian.NewTrillianLogClient(lconn),
trillian.NewTrillianMapClient(mconn),
trillian.NewTrillianAdminClient(lconn),
trillian.NewTrillianAdminClient(mconn),
directoryStorage,
mutations,
func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) {
return der.NewProtoFromSpec(spec)
}))
reflection.Register(grpcServer)
grpc_prometheus.Register(grpcServer)
grpc_prometheus.EnableHandlingTimeHistogram()
keygen := func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) {
return der.NewProtoFromSpec(spec)
}
adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, directoryStorage, mutations, keygen)
glog.Infof("Signer starting")
// Run servers
httpServer := startHTTPServer(adminServer)
httpServer := startHTTPServer(grpcServer, addr,
pb.RegisterKeyTransparencyAdminHandlerFromEndpoint,
)
// Periodically run batch.
signer := sequencer.New(
spb.NewKeyTransparencySequencerClient(conn),
trillian.NewTrillianAdminClient(mconn),
directoryStorage,
int32(*batchSize))
cctx, cancel := context.WithCancel(context.Background())
defer cancel()
sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
sequencer.PeriodicallyRun(cctx, time.Tick(*refresh), func(ctx context.Context) {
if err := signer.RunBatchForAllDirectories(ctx); err != nil {
glog.Errorf("PeriodicallyRun(RunBatchForAllDirectories): %v", err)
}
})
// Shutdown.
httpServer.Shutdown(cctx)
glog.Errorf("Signer exiting")
}
@@ -15,7 +15,6 @@
package main
import (
"flag"
"net/http"
"github.com/google/keytransparency/cmd/serverutil"
@@ -24,39 +23,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
_ "github.com/google/trillian/crypto/keys/der/proto"
_ "github.com/google/trillian/merkle/coniks" // Register hasher
_ "github.com/google/trillian/merkle/rfc6962" // Register hasher
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
)
var (
addr = flag.String("addr", ":8080", "The ip:port to serve on")
metricsAddr = flag.String("metrics-addr", ":8081", "The ip:port to publish metrics on")
keyFile = flag.String("tls-key", "genfiles/server.key", "TLS private key file")
certFile = flag.String("tls-cert", "genfiles/server.crt", "TLS cert file")
)
func startHTTPServer(svr pb.KeyTransparencyAdminServer) *http.Server {
func startHTTPServer(grpcServer *grpc.Server, addr string,
services ...serverutil.RegisterServiceFromEndpoint) *http.Server {
// Wire up gRPC and HTTP servers.
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
glog.Exitf("Failed to load server credentials %v", err)
}
grpcServer := grpc.NewServer(
grpc.Creds(creds),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
tcreds, err := credentials.NewClientTLSFromFile(*certFile, "")
if err != nil {
glog.Exitf("Failed opening cert file %v: %v", *certFile, err)
}
gwmux, err := serverutil.GrpcGatewayMux(*addr, tcreds,
pb.RegisterKeyTransparencyAdminHandlerFromEndpoint)
gwmux, err := serverutil.GrpcGatewayMux(addr, tcreds, services...)
if err != nil {
glog.Exitf("Failed setting up REST proxy: %v", err)
}
@@ -72,18 +48,13 @@ func startHTTPServer(svr pb.KeyTransparencyAdminServer) *http.Server {
}
}()
pb.RegisterKeyTransparencyAdminServer(grpcServer, svr)
reflection.Register(grpcServer)
grpc_prometheus.Register(grpcServer)
grpc_prometheus.EnableHandlingTimeHistogram()
server := &http.Server{
Addr: *addr,
Addr: addr,
Handler: serverutil.GrpcHandlerFunc(grpcServer, mux),
}
go func() {
glog.Infof("Listening on %v", *addr)
glog.Infof("Listening on %v", addr)
if err := server.ListenAndServeTLS(*certFile, *keyFile); err != nil {
glog.Errorf("ListenAndServeTLS: %v", err)
}
@@ -18,11 +18,9 @@ package sequencer
import (
"context"
"fmt"
"net"
"time"
"github.com/golang/glog"
"google.golang.org/grpc"
"github.com/google/keytransparency/core/directory"
@@ -53,43 +51,6 @@ func New(
}
}
// RunAndConnect creates a local gRPC server and returns a connected client.
func RunAndConnect(ctx context.Context, impl spb.KeyTransparencySequencerServer) (client spb.KeyTransparencySequencerClient, stop func(), startErr error) {
server := grpc.NewServer()
spb.RegisterKeyTransparencySequencerServer(server, impl)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("error creating TCP listener: %v", err)
}
defer func() {
if startErr != nil {
lis.Close()
}
}()
go func() {
if err := server.Serve(lis); err != nil {
glog.Errorf("server exited with error: %v", err)
}
}()
addr := lis.Addr().String()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure())
if err != nil {
return nil, func() {}, fmt.Errorf("error connecting to %v: %v", addr, err)
}
stop = func() {
server.GracefulStop()
conn.Close()
lis.Close()
}
client = spb.NewKeyTransparencySequencerClient(conn)
return client, stop, err
}
// PeriodicallyRun executes f once per tick until ctx is closed.
// Closing ctx will also stop any in-flight operation mid-way through.
func PeriodicallyRun(ctx context.Context, tickch <-chan time.Time, f func(ctx context.Context)) {
@@ -102,6 +102,7 @@ type Server struct {
batcher Batcher
trillian trillianFactory
logs LogsReader
loopback spb.KeyTransparencySequencerClient
}
// NewServer creates a new KeyTransparencySequencerServer.
@@ -113,6 +114,7 @@ func NewServer(
tmap tpb.TrillianMapClient,
batcher Batcher,
logs LogsReader,
loopback spb.KeyTransparencySequencerClient,
metricsFactory monitoring.MetricFactory,
) *Server {
once.Do(func() { createMetrics(metricsFactory) })
@@ -124,8 +126,9 @@ func NewServer(
tmap: tmap,
tlog: tlog,
},
batcher: batcher,
logs: logs,
batcher: batcher,
logs: logs,
loopback: loopback,
}
}
@@ -142,7 +145,7 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
}
for _, rev := range outstandingRevs {
if _, err := s.CreateRevision(ctx, &spb.CreateRevisionRequest{
if _, err := s.loopback.CreateRevision(ctx, &spb.CreateRevisionRequest{
DirectoryId: in.DirectoryId,
Revision: rev,
}); err != nil {
@@ -298,7 +301,7 @@ func (s *Server) CreateRevision(ctx context.Context, in *spb.CreateRevisionReque
mutationCount.Add(float64(len(msgs)), directoryID)
glog.Infof("CreatedRevision: rev: %v with %v mutations, root: %x", mapRoot.Revision, len(msgs), mapRoot.RootHash)
return s.PublishBatch(ctx, &spb.PublishBatchRequest{DirectoryId: directoryID})
return s.loopback.PublishBatch(ctx, &spb.PublishBatchRequest{DirectoryId: directoryID})
}
// PublishBatch copies the MapRoots of all known map revisions into the Log of MapRoots.
Oops, something went wrong.

0 comments on commit 3c6f62f

Please sign in to comment.