Skip to content

Commit

Permalink
api: v2 api with gRPC and gRPC-gateway
Browse files Browse the repository at this point in the history
Newly designed API defines Ancestry as a set of layers
and shrinked the api to only the most used apis:
post ancestry, get layer, get notification, delete notification

Fixes quay#98
  • Loading branch information
KeyboardNerd committed Jun 2, 2017
1 parent ed75f3f commit e87240f
Show file tree
Hide file tree
Showing 13 changed files with 2,474 additions and 40 deletions.
199 changes: 195 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,227 @@
package api

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cmux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
log "github.com/sirupsen/logrus"
"github.com/tylerb/graceful"
"google.golang.org/grpc"

pb "github.com/coreos/clair/api/v2/clairpb"
"github.com/coreos/clair/api/v2/server"
"github.com/coreos/clair/database"
"github.com/coreos/clair/pkg/stopper"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/credentials"
)

const timeoutResponse = `{"Error":{"Message":"Clair failed to respond within the configured timeout window.","Type":"Timeout"}}`

// Config is the configuration for the API service.
type Config struct {
Port int
GrpcPort int
HealthPort int
Timeout time.Duration
PaginationKey string
CertFile, KeyFile, CAFile string
}

func Run(cfg *Config, store database.Datastore, st *stopper.Stopper) {
func serveWithStopper(serverName string, server *http.Server, l net.Listener, st *stopper.Stopper) {
go func() {
<-st.Chan()
log.Info("Shut down the server: %s", serverName)
server.Shutdown(context.Background())
}()

err := server.Serve(l)
errorHandle(err)
}

type loggingResponseWriter struct {
http.ResponseWriter

StatusCode int
}

func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
return &loggingResponseWriter{w, http.StatusOK}
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.StatusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

func loggingHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
lrw := newLoggingResponseWriter(w)
handler.ServeHTTP(lrw, r)
statusStr := strconv.Itoa(lrw.StatusCode)
if lrw.StatusCode == 0 {
statusStr = "???"
}
log.WithFields(log.Fields{"remote addr": r.RemoteAddr, "method": r.Method, "request uri": r.RequestURI, "status": statusStr, "elapsed time": time.Since(start)}).Info("Handled HTTP request")
})
}

func errorHandle(err error) {
if err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
log.Fatal(err)
}
}
}

var (
promResponseDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "clair_v2_api_response_duration_milliseconds",
Help: "The duration of time it takes to receieve and write a response to an V2 API request",
Buckets: prometheus.ExponentialBuckets(9.375, 2, 10),
}, []string{"route", "code"})
)

func init() {
prometheus.MustRegister(promResponseDurationMilliseconds)
}

// RunGrpcServer runs grpc server and grpc gateway server on the same port
func RunGrpcServer(cfg *Config, store database.Datastore, st *stopper.Stopper) {
defer st.End()
if cfg == nil {
log.Info("grpc API service is disabled.")
return
}

// read CA File to setup client certificate
tlsConfig, err := tlsClientConfig(cfg.CAFile)
if err != nil {
log.WithError(err).Fatal("could not initialize client cert authentication")
}

// initialize port listener and multiplexer
l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.GrpcPort))
log.WithField("addr", l.Addr().String()).Info("starting grpc server")
if err != nil {
log.WithError(err).Fatalf("could not bind to port %d", cfg.GrpcPort)
}
m := cmux.New(l)

if tlsConfig != nil {
// setup mutual certificate
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
log.WithError(err).Fatal("Failed to load certificate files")
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.NextProtos = []string{"h2"} // enable http2 support

tlsL := tls.NewListener(m.Match(cmux.Any()), tlsConfig)
go func() { errorHandle(m.Serve()) }()

opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
}
// set up grpc server
grpcServer := grpc.NewServer(opts...)
pb.RegisterClairAPIServiceServer(grpcServer, &server.ClairAPIServer{PaginationKey: cfg.PaginationKey, Store: store})
// setup gateway server
dtlsConfig := tlsConfig.Clone()
// enable local traffic
dtlsConfig.InsecureSkipVerify = true
creds := credentials.NewTLS(dtlsConfig)
gwOpts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
jsonOpt := runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true})
ctx := context.Background()
gwmux := runtime.NewServeMux(jsonOpt)
err = pb.RegisterClairAPIServiceHandlerFromEndpoint(ctx, gwmux, l.Addr().String(), gwOpts)
if err != nil {
log.WithError(err).Fatal("could not initialize grpc gateway")
}
srvMux := http.NewServeMux()
srvMux.Handle("/metrics", prometheus.Handler())
srvMux.Handle("/", gwmux)
// setup handler
handler := grpcHandlerFunc(grpcServer, srvMux)
// setup server
log.Warn()
srv := &http.Server{
Handler: loggingHandler(handler),
TLSConfig: tlsConfig,
}
log.Info("grpc server running with client certificate authentication")
serveWithStopper("Grpc server ", srv, tlsL, st)
} else {
// setup insecure connection

// setup listener multiplexer
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.Any())
go func() { errorHandle(m.Serve()) }()
opts := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
}
// setup grpc server
grpcServer := grpc.NewServer(opts...)
pb.RegisterClairAPIServiceServer(grpcServer, &server.ClairAPIServer{PaginationKey: cfg.PaginationKey, Store: store})
go func() { errorHandle(grpcServer.Serve(grpcL)) }()

// setup grpc gateway server
jsonOpt := runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true})
gwmux := runtime.NewServeMux(jsonOpt)

ctx := context.Background()
gwOpts := []grpc.DialOption{grpc.WithInsecure()}
err = pb.RegisterClairAPIServiceHandlerFromEndpoint(ctx, gwmux, grpcL.Addr().String(), gwOpts)
if err != nil {
log.WithError(err).Fatal("could not initialize grpc gateway")
}

srvMux := http.NewServeMux()
srvMux.Handle("/metrics", prometheus.Handler())
srvMux.Handle("/", gwmux)

gwServer := &http.Server{
Handler: loggingHandler(srvMux),
}
log.Warn("grpc server running: configured without client certificate authentication")
serveWithStopper("Grpc server (insecure)", gwServer, httpL, st)
}
log.Info("Grpc API stopped")
}

// Run V1 API
func Run(cfg *Config, store database.Datastore, st *stopper.Stopper) {
defer st.End()
// Do not run the API service if there is no config.
if cfg == nil {
log.Info("main API service is disabled.")
return
}
log.WithField("port", cfg.Port).Info("starting main API")

// cfg.CAFile is for
tlsConfig, err := tlsClientConfig(cfg.CAFile)
if err != nil {
log.WithError(err).Fatal("could not initialize client cert authentication")
}
if tlsConfig != nil {
log.Info("main API configured with client certificate authentication")
}

log.WithField("port", cfg.Port).Info("starting HTTP API")
srv := &graceful.Server{
Timeout: 0, // Already handled by our TimeOut middleware
NoSignalHandling: true, // We want to use our own Stopper
Expand All @@ -74,6 +251,7 @@ func Run(cfg *Config, store database.Datastore, st *stopper.Stopper) {
log.Info("main API stopped")
}

// RunHealth API
func RunHealth(cfg *Config, store database.Datastore, st *stopper.Stopper) {
defer st.End()

Expand Down Expand Up @@ -138,6 +316,7 @@ func tlsClientConfig(caPath string) (*tls.Config, error) {
return nil, err
}

// create a ca certificate pool to verify the clients
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

Expand All @@ -148,3 +327,15 @@ func tlsClientConfig(caPath string) (*tls.Config, error) {

return tlsConfig, nil
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Copied from cockroachdb.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
otherHandler.ServeHTTP(w, r)
}
})
}
5 changes: 2 additions & 3 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// router is an HTTP router that forwards requests to the appropriate sub-router
// depending on the API version specified in the request URI.
type router map[string]*httprouter.Router
type router map[string]http.Handler

// Let's hope we never have more than 99 API versions.
const apiVersionLength = len("v99")
Expand All @@ -52,8 +52,7 @@ func (rtr router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router.ServeHTTP(w, r)
return
}

log.WithFields(log.Fields{"status": http.StatusNotFound, "method": r.Method, "request uri": r.RequestURI, "remote addr": r.RemoteAddr}).Info("Served HTTP request")
log.WithFields(log.Fields{"http header": r.Header, "http version": r.ProtoMajor, "status": http.StatusNotFound, "method": r.Method, "request uri": r.RequestURI, "remote addr": r.RemoteAddr}).Info("Served HTTP request")
http.NotFound(w, r)
}

Expand Down
17 changes: 17 additions & 0 deletions api/v2/clairpb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
all:
protoc -I/usr/local/include -I. \
-I${GOPATH}/src \
-I${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--go_out=plugins=grpc:. \
clair.proto
protoc -I/usr/local/include -I. \
-I${GOPATH}/src \
-I${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--grpc-gateway_out=logtostderr=true:. \
clair.proto
protoc -I/usr/local/include -I. \
-I${GOPATH}/src \
-I${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--swagger_out=logtostderr=true:. \
clair.proto
go generate .
Loading

0 comments on commit e87240f

Please sign in to comment.