Permalink
Browse files

Fix CI (#1164)

* remove dead code

* Update kubernetes binary paths

* Slightly faster grpc matcher

* DialContext

* serveHTTPMetric in separate function

* Remove TLS config from grpc server

We are serving TLS through the http server and forwarding grpc
connections to the grpc server.

* Move runSequencer to its own function

* serveHTTPGateway with listener and dopts

Rather than opening a socket inside of serveHTTP, reuse a passed in
listener. This allows the server to listen on a predetermined socket.

* Set transport creds on grpc client

* Fixup mains

* nits
  • Loading branch information...
gdbelvin committed Jan 22, 2019
1 parent f0a84c3 commit d068f141f97aec69176ce916efc3ca34294d4cdf
@@ -116,7 +116,8 @@ func main() {
if err != nil {
glog.Exitf("Failed opening cert file %v: %v", *certFile, err)
}
gwmux, err := serverutil.GrpcGatewayMux(*addr, tcreds,
dopts := []grpc.DialOption{grpc.WithTransportCredentials(tcreds)}
gwmux, err := serverutil.GrpcGatewayMux(ctx, *addr, dopts,
mopb.RegisterMonitorHandlerFromEndpoint)
if err != nil {
glog.Exitf("Failed setting up REST proxy: %v", err)
@@ -21,7 +21,6 @@ import (
"fmt"
"net"
"os"
"strings"
"time"

"github.com/golang/glog"
@@ -44,6 +43,7 @@ import (
"github.com/google/keytransparency/impl/sql/mutationstorage"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
dir "github.com/google/keytransparency/core/directory"
spb "github.com/google/keytransparency/core/sequencer/sequencer_go_proto"
etcdelect "github.com/google/trillian/util/election2/etcd"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
@@ -115,13 +115,15 @@ func getElectionFactory() (election2.Factory, func()) {
func main() {
flag.Parse()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Connect to trillian log and map backends.
mconn, err := grpc.Dial(*mapURL, grpc.WithInsecure())
mconn, err := grpc.DialContext(ctx, *mapURL, grpc.WithInsecure())
if err != nil {
glog.Exitf("grpc.Dial(%v): %v", *mapURL, err)
}
lconn, err := grpc.Dial(*logURL, grpc.WithInsecure())
lconn, err := grpc.DialContext(ctx, *logURL, grpc.WithInsecure())
if err != nil {
glog.Exitf("Failed to connect to %v: %v", *logURL, err)
}
@@ -139,12 +141,7 @@ func main() {
glog.Exitf("Failed to create directory storage object: %v", err)
}

creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
glog.Exitf("Failed to load server credentials %v", err)
}
grpcServer := grpc.NewServer(
grpc.Creds(creds),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
@@ -154,9 +151,15 @@ func main() {
if err != nil {
glog.Exitf("error creating TCP listener: %v", err)
}
addr := lis.Addr().String()
glog.Infof("Listening on %v", lis.Addr().String())
// Non-blocking dial before we start the server.
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure())
tcreds, err := credentials.NewClientTLSFromFile(*certFile, "localhost")
if err != nil {
glog.Exitf("Failed opening cert file %v: %v", *certFile, err)
}
dopts := []grpc.DialOption{grpc.WithTransportCredentials(tcreds)}
addr := lis.Addr().String()
conn, err := grpc.DialContext(ctx, addr, dopts...)
if err != nil {
glog.Exitf("error connecting to %v: %v", addr, err)
}
@@ -190,16 +193,18 @@ func main() {
glog.Infof("Signer starting")

// Run servers
httpServer := startHTTPServer(grpcServer, addr,
go serveHTTPMetric(*metricsAddr)
go serveHTTPGateway(ctx, lis, dopts, grpcServer,
pb.RegisterKeyTransparencyAdminHandlerFromEndpoint,
)
runSequencer(ctx, conn, mconn, directoryStorage)

cli, err := etcd.NewClient(strings.Split(*etcdServers, ","), 5*time.Second)
if err != nil || cli == nil {
glog.Exitf("Failed to create etcd client: %v", err)
}
// Shutdown.
glog.Errorf("Signer exiting")
}

// Periodically run batch.
func runSequencer(ctx context.Context, conn, mconn *grpc.ClientConn,
directoryStorage dir.Storage) {
electionFactory, closeFactory := getElectionFactory()
defer closeFactory()
signer := sequencer.New(
@@ -210,15 +215,9 @@ func main() {
election.NewTracker(electionFactory, 1*time.Hour, prometheus.MetricFactory{}),
)

cctx, cancel := context.WithCancel(context.Background())
defer cancel()
sequencer.PeriodicallyRun(cctx, time.Tick(*refresh), func(ctx context.Context) {
sequencer.PeriodicallyRun(ctx, time.Tick(*refresh), func(ctx context.Context) {
if err := signer.RunBatchForAllDirectories(ctx); err != nil {
glog.Errorf("PeriodicallyRun(RunBatchForAllDirectories): %v", err)
}
})

// Shutdown.
httpServer.Shutdown(cctx)
glog.Errorf("Signer exiting")
}
@@ -15,50 +15,40 @@
package main

import (
"context"
"net"
"net/http"

"github.com/google/keytransparency/cmd/serverutil"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func startHTTPServer(grpcServer *grpc.Server, addr string,
services ...serverutil.RegisterServiceFromEndpoint) *http.Server {
// Wire up gRPC and HTTP servers.
tcreds, err := credentials.NewClientTLSFromFile(*certFile, "")
if err != nil {
glog.Exitf("Failed opening cert file %v: %v", *certFile, err)
func serveHTTPMetric(addr string) {
metricMux := http.NewServeMux()
metricMux.Handle("/metrics", promhttp.Handler())

glog.Infof("Hosting metrics on %v", addr)
if err := http.ListenAndServe(addr, metricMux); err != nil {
glog.Fatalf("ListenAndServeTLS(%v): %v", addr, err)
}
gwmux, err := serverutil.GrpcGatewayMux(addr, tcreds, services...)
}

func serveHTTPGateway(ctx context.Context, lis net.Listener, dopts []grpc.DialOption,
grpcServer *grpc.Server, services ...serverutil.RegisterServiceFromEndpoint) {
// Wire up gRPC and HTTP servers.
gwmux, err := serverutil.GrpcGatewayMux(ctx, lis.Addr().String(), dopts, services...)
if err != nil {
glog.Exitf("Failed setting up REST proxy: %v", err)
}

mux := http.NewServeMux()
mux.Handle("/", gwmux)

metricMux := http.NewServeMux()
metricMux.Handle("/metrics", promhttp.Handler())
go func() {
glog.Infof("Hosting metrics on %v", *metricsAddr)
if err := http.ListenAndServe(*metricsAddr, metricMux); err != nil {
glog.Fatalf("ListenAndServeTLS(%v): %v", *metricsAddr, err)
}
}()

server := &http.Server{
Addr: addr,
Handler: serverutil.GrpcHandlerFunc(grpcServer, mux),
server := &http.Server{Handler: serverutil.GrpcHandlerFunc(grpcServer, mux)}
if err := server.ServeTLS(lis, *certFile, *keyFile); err != nil {
glog.Errorf("ListenAndServeTLS: %v", err)
}

go func() {
glog.Infof("Listening on %v", addr)
if err := server.ListenAndServeTLS(*certFile, *keyFile); err != nil {
glog.Errorf("ListenAndServeTLS: %v", err)
}
}()
// Return a handle to the http server to callers can call Shutdown().
return server
}
@@ -15,6 +15,7 @@
package main

import (
"context"
"database/sql"
"flag"
"net/http"
@@ -67,6 +68,7 @@ func openDB() *sql.DB {

func main() {
flag.Parse()
ctx := context.Background()

// Open Resources.
sqldb := openDB()
@@ -149,7 +151,8 @@ func main() {
if err != nil {
glog.Exitf("Failed opening cert file %v: %v", *certFile, err)
}
gwmux, err := serverutil.GrpcGatewayMux(*addr, tcreds,
dopts := []grpc.DialOption{grpc.WithTransportCredentials(tcreds)}
gwmux, err := serverutil.GrpcGatewayMux(ctx, *addr, dopts,
pb.RegisterKeyTransparencyHandlerFromEndpoint)
if err != nil {
glog.Exitf("Failed setting up REST proxy: %v", err)
@@ -21,7 +21,6 @@ import (

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// GrpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
@@ -30,7 +29,8 @@ func GrpcHandlerFunc(grpcServer http.Handler, otherHandler http.Handler) http.Ha
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// This is a partial recreation of gRPC's internal checks.
// https://github.com/grpc/grpc-go/blob/master/transport/handler_server.go#L62
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
if r.ProtoMajor == 2 && strings.HasPrefix(
r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
otherHandler.ServeHTTP(w, r)
@@ -42,11 +42,8 @@ func GrpcHandlerFunc(grpcServer http.Handler, otherHandler http.Handler) http.Ha
type RegisterServiceFromEndpoint func(context.Context, *runtime.ServeMux, string, []grpc.DialOption) error

// GrpcGatewayMux registers multiple gRPC services with a gRPC ServeMux
func GrpcGatewayMux(addr string, transportCreds credentials.TransportCredentials,
func GrpcGatewayMux(ctx context.Context, addr string, dopts []grpc.DialOption,
services ...RegisterServiceFromEndpoint) (*runtime.ServeMux, error) {
ctx := context.Background()

dopts := []grpc.DialOption{grpc.WithTransportCredentials(transportCreds)}

gwmux := runtime.NewServeMux()
for _, s := range services {
@@ -18,12 +18,13 @@ spec:
io.kompose.service: log-server
spec:
containers:
- command:
- /go/bin/trillian_log_server
- --mysql_uri=test:zaphod@tcp(db:3306)/test
- --rpc_endpoint=0.0.0.0:8090
- --http_endpoint=0.0.0.0:8091
- --alsologtostderr
- name: trillian-logserver
args: [
"--mysql_uri=test:zaphod@tcp(db:3306)/test",
"--rpc_endpoint=0.0.0.0:8090",
"--http_endpoint=0.0.0.0:8091",
"--alsologtostderr"
]
image: us.gcr.io/key-transparency/log-server:latest
livenessProbe:
httpGet:
@@ -18,16 +18,17 @@ spec:
io.kompose.service: log-signer
spec:
containers:
- command:
- /go/bin/trillian_log_signer
- --mysql_uri=test:zaphod@tcp(db:3306)/test
- --http_endpoint=0.0.0.0:8091
- --sequencer_guard_window=0s
- --sequencer_interval=1s
- --num_sequencers=1
- --batch_size=50
- --force_master=true
- --alsologtostderr
- name: trillian-logsigner
args: [
"--mysql_uri=test:zaphod@tcp(db:3306)/test",
"--http_endpoint=0.0.0.0:8091",
"--sequencer_guard_window=0s",
"--sequencer_interval=1s",
"--num_sequencers=1",
"--batch_size=50",
"--force_master=true",
"--alsologtostderr"
]
image: us.gcr.io/key-transparency/log-signer:latest
livenessProbe:
httpGet:
@@ -18,12 +18,13 @@ spec:
io.kompose.service: map-server
spec:
containers:
- command:
- /go/bin/trillian_map_server
- --mysql_uri=test:zaphod@tcp(db:3306)/test
- --rpc_endpoint=0.0.0.0:8090
- --http_endpoint=0.0.0.0:8091
- --alsologtostderr
- name: trillian-mapserver
args: [
"--mysql_uri=test:zaphod@tcp(db:3306)/test",
"--rpc_endpoint=0.0.0.0:8090",
"--http_endpoint=0.0.0.0:8091",
"--alsologtostderr"
]
image: us.gcr.io/key-transparency/map-server:latest
livenessProbe:
httpGet:

0 comments on commit d068f14

Please sign in to comment.