diff --git a/mains/generate.go b/mains/generate.go index df5ced6f..cb09e463 100644 --- a/mains/generate.go +++ b/mains/generate.go @@ -31,6 +31,8 @@ type srvInfo struct { APIPkg string Services []*service.Data HasWS bool + HasHTTP bool + HasGRPC bool ServerName string } @@ -43,7 +45,11 @@ type svcT struct { SrvVar string GenPkg string GenHTTPPkg string + GenGRPCPkg string + GenGRPCPbPkg string HasWebSocket bool + HasHTTP bool + HasGRPC bool } // Register the plugin for the example phase. @@ -109,9 +115,20 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ apipkg := apiPkgAlias(genpkg, roots) if info, ok := srvMap[dir]; ok { info.HasWS = hasWS + info.HasHTTP = true if info.APIPkg == "" { info.APIPkg = apipkg } } else { - srvMap[dir] = &srvInfo{Dir: dir, APIPkg: apipkg, Services: svcs, HasWS: hasWS} + srvMap[dir] = &srvInfo{Dir: dir, APIPkg: apipkg, Services: svcs, HasWS: hasWS, HasHTTP: true} + } + } + // Detect gRPC servers from grpc.go files + for _, f := range files { + if filepath.Base(f.Path) != "grpc.go" { continue } + segs := strings.Split(filepath.ToSlash(f.Path), "/") + if len(segs) < 3 || segs[0] != "cmd" { continue } + dir := segs[1] + if info, ok := srvMap[dir]; ok { + info.HasGRPC = true } } @@ -119,11 +136,11 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ return files, nil } - // Filter out default example mains and http.go; we'll add our own mains. + // Filter out default example mains, http.go, and grpc.go; we'll add our own mains. var out []*codegen.File for _, f := range files { base := filepath.Base(f.Path) - if strings.HasPrefix(f.Path, "cmd/") && (base == "main.go" || base == "http.go") { + if strings.HasPrefix(f.Path, "cmd/") && (base == "main.go" || base == "http.go" || base == "grpc.go") { continue } out = append(out, f) @@ -156,6 +173,14 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ codegen.GoaNamedImport("http", "goahttp"), {Path: "google.golang.org/grpc/credentials/insecure"}, } + if info.HasGRPC { + specs = append(specs, + &codegen.ImportSpec{Path: "net"}, + &codegen.ImportSpec{Path: "google.golang.org/grpc"}, + &codegen.ImportSpec{Path: "google.golang.org/grpc/reflection"}, + &codegen.ImportSpec{Path: "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"}, + ) + } if info.HasWS { specs = append(specs, &codegen.ImportSpec{Path: "github.com/gorilla/websocket"}) } @@ -164,19 +189,45 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ scope := codegen.NewNameScope() var svcsData []svcT + httpBySvc := httpServicesByName(roots) + grpcBySvc := grpcServicesByName(roots) wsBySvc := httpWebSocketByService(roots) hasAnyWS := false + hasAnyHTTP := false + hasAnyGRPC := false for _, sd := range info.Services { genAlias := scope.Unique(sd.PkgName, "svc") - httpAlias := scope.Unique(sd.PkgName+"svr", "svr") - specs = append(specs, - &codegen.ImportSpec{Path: path.Join(genpkg, sd.PathName), Name: genAlias}, - &codegen.ImportSpec{Path: path.Join(genpkg, "http", sd.PathName, "server"), Name: httpAlias}, - ) + hasHTTP := httpBySvc[sd.Name] + hasGRPC := grpcBySvc[sd.Name] hws := wsBySvc[sd.Name] + + var httpAlias, grpcAlias, grpcPbAlias string + + // Always add the base service package + specs = append(specs, &codegen.ImportSpec{Path: path.Join(genpkg, sd.PathName), Name: genAlias}) + + // Conditionally add HTTP server imports + if hasHTTP { + httpAlias = scope.Unique(sd.PkgName+"svr", "svr") + specs = append(specs, &codegen.ImportSpec{Path: path.Join(genpkg, "http", sd.PathName, "server"), Name: httpAlias}) + hasAnyHTTP = true + } + + // Conditionally add gRPC server imports + if hasGRPC { + grpcAlias = scope.Unique(sd.PkgName+"grpc", "grpcsvc") + grpcPbAlias = scope.Unique(sd.PkgName+"pb", "pb") + specs = append(specs, + &codegen.ImportSpec{Path: path.Join(genpkg, "grpc", sd.PathName, "server"), Name: grpcAlias}, + &codegen.ImportSpec{Path: path.Join(genpkg, "grpc", sd.PathName, "pb"), Name: grpcPbAlias}, + ) + hasAnyGRPC = true + } + if hws { hasAnyWS = true } + svcsData = append(svcsData, svcT{ Name: sd.Name, StructName: sd.StructName, @@ -185,7 +236,11 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ SrvVar: sd.VarName + "Server", GenPkg: genAlias, GenHTTPPkg: httpAlias, + GenGRPCPkg: grpcAlias, + GenGRPCPbPkg: grpcPbAlias, HasWebSocket: hws, + HasHTTP: hasHTTP, + HasGRPC: hasGRPC, }) } @@ -195,6 +250,8 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([ "APIPkg": info.APIPkg, "Services": svcsData, "HasAnyWebSocket": hasAnyWS, + "HasHTTP": hasAnyHTTP, + "HasGRPC": hasAnyGRPC, "ServiceCount": len(svcsData), "ServerLabel": serverLabel(roots), }}, @@ -265,7 +322,8 @@ func httpWebSocketByService(roots []eval.Root) map[string]bool { if e.SSE != nil { continue } - if e.MethodExpr != nil && e.MethodExpr.Stream != expr.NoStreamKind { + // Stream is 0 when no streaming is defined, and >= NoStreamKind (1) when streaming is used + if e.MethodExpr != nil && e.MethodExpr.Stream != 0 { hasWS[svc.Name()] = true break } @@ -286,3 +344,37 @@ func rootServer(roots []eval.Root) *expr.ServerExpr { } return nil } + +// httpServicesByName returns map of service names that have HTTP endpoints. +func httpServicesByName(roots []eval.Root) map[string]bool { + hasHTTP := map[string]bool{} + for _, r := range roots { + root, ok := r.(*expr.RootExpr) + if !ok || root.API == nil || root.API.HTTP == nil { + continue + } + for _, svc := range root.API.HTTP.Services { + if len(svc.HTTPEndpoints) > 0 { + hasHTTP[svc.Name()] = true + } + } + } + return hasHTTP +} + +// grpcServicesByName returns map of service names that have gRPC endpoints. +func grpcServicesByName(roots []eval.Root) map[string]bool { + hasGRPC := map[string]bool{} + for _, r := range roots { + root, ok := r.(*expr.RootExpr) + if !ok || root.API == nil || root.API.GRPC == nil { + continue + } + for _, svc := range root.API.GRPC.Services { + if len(svc.GRPCEndpoints) > 0 { + hasGRPC[svc.Name()] = true + } + } + } + return hasGRPC +} diff --git a/mains/templates/main.go.tpl b/mains/templates/main.go.tpl index 45736101..7852aae4 100644 --- a/mains/templates/main.go.tpl +++ b/mains/templates/main.go.tpl @@ -1,35 +1,11 @@ -package main - -import ( - "context" - "flag" - "fmt" - "net/http" - "net/http/httptrace" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "goa.design/clue/clue" - "goa.design/clue/debug" - "goa.design/clue/health" - "goa.design/clue/log" - goahttp "goa.design/goa/v3/http" - {{- if .HasAnyWebSocket }} - "github.com/gorilla/websocket" - {{- end }} - "google.golang.org/grpc/credentials/insecure" -) - func main() { var ( + {{- if .HasHTTP }} httpaddr = flag.String("http-addr", ":8080", "HTTP listen address") + {{- end }} + {{- if .HasGRPC }} + grpcaddr = flag.String("grpc-addr", ":9090", "gRPC listen address") + {{- end }} metricsAddr = flag.String("metrics-addr", ":8081", "metrics listen address") coladdr = flag.String("otel-addr", ":4317", "OpenTelemetry collector listen address") debugf = flag.Bool("debug", false, "Enable debug logs") @@ -126,6 +102,7 @@ func main() { {{ .EpVar }}.Use(log.Endpoint) {{- end }} + {{- if .HasHTTP }} // 6. Create HTTP transport mux := goahttp.NewMuxer() debug.MountDebugLogEnabler(debug.Adapt(mux)) @@ -139,6 +116,7 @@ func main() { {{- end }} {{- range .Services }} + {{- if .HasHTTP }} // {{ .Name }} HTTP server {{- if .HasWebSocket }} {{ .SrvVar }} := {{ .GenHTTPPkg }}.New({{ .EpVar }}, mux, goahttp.RequestDecoder, goahttp.ResponseEncoder, nil, nil, upgrader, nil) @@ -150,10 +128,37 @@ func main() { log.Print(ctx, log.KV{K: "method", V: m.Method}, log.KV{K: "endpoint", V: m.Verb + " " + m.Pattern}) } {{- end }} + {{- end }} httpServer := &http.Server{Addr: *httpaddr, Handler: handler} + {{- end }} + + {{- if .HasGRPC }} + // 6b. Create gRPC server with interceptors + var grpcServerOpts []grpc.ServerOption + grpcServerOpts = append(grpcServerOpts, grpc.StatsHandler(otelgrpc.NewServerHandler())) + grpcServerOpts = append(grpcServerOpts, grpc.ChainUnaryInterceptor( + log.UnaryServerInterceptor(ctx), + debug.UnaryServerInterceptor(), + )) + grpcServerOpts = append(grpcServerOpts, grpc.ChainStreamInterceptor( + log.StreamServerInterceptor(ctx), + debug.StreamServerInterceptor(), + )) + grpcServer := grpc.NewServer(grpcServerOpts...) + + {{- range .Services }} + {{- if .HasGRPC }} + // {{ .Name }} gRPC server + {{ .SvcVar }}GRPCServer := {{ .GenGRPCPkg }}.New({{ .EpVar }}, nil) + {{ .GenGRPCPbPkg }}.Register{{ .StructName }}Server(grpcServer, {{ .SvcVar }}GRPCServer) + {{- end }} + {{- end }} - // 7. Start HTTP servers (graceful shutdown) + reflection.Register(grpcServer) + {{- end }} + + // 7. Start servers (graceful shutdown) errc := make(chan error) go func() { c := make(chan os.Signal, 1) @@ -167,10 +172,24 @@ func main() { go func() { defer wg.Done() + {{- if .HasHTTP }} go func() { log.Printf(ctx, "HTTP server listening on %s", *httpaddr) errc <- httpServer.ListenAndServe() }() + {{- end }} + + {{- if .HasGRPC }} + go func() { + lis, err := net.Listen("tcp", *grpcaddr) + if err != nil { + errc <- err + return + } + log.Printf(ctx, "gRPC server listening on %s", *grpcaddr) + errc <- grpcServer.Serve(lis) + }() + {{- end }} go func() { log.Printf(ctx, "Metrics server listening on %s", *metricsAddr) @@ -178,7 +197,7 @@ func main() { }() <-ctx.Done() - log.Printf(ctx, "shutting down HTTP servers") + log.Printf(ctx, "shutting down servers") // Shutdown gracefully with a 30s timeout. sctx, scancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -192,9 +211,16 @@ func main() { } {{- end }} + {{- if .HasHTTP }} if err := httpServer.Shutdown(sctx); err != nil { log.Errorf(sctx, err, "failed to shutdown HTTP server") } + {{- end }} + + {{- if .HasGRPC }} + grpcServer.GracefulStop() + {{- end }} + if err := metricsServer.Shutdown(sctx); err != nil { log.Errorf(sctx, err, "failed to shutdown metrics server") }