Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 101 additions & 9 deletions mains/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type srvInfo struct {
APIPkg string
Services []*service.Data
HasWS bool
HasHTTP bool
HasGRPC bool
ServerName string
}

Expand All @@ -43,7 +45,11 @@ type svcT struct {
SrvVar string
GenPkg string
GenHTTPPkg string
GenGRPCPkg string
GenGRPCPbPkg string
HasWebSocket bool
HasHTTP bool
HasGRPC bool
}

// Register the plugin for the example phase.
Expand Down Expand Up @@ -109,21 +115,32 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([
apipkg := apiPkgAlias(genpkg, roots)
if info, ok := srvMap[dir]; ok {
info.HasWS = hasWS
info.HasHTTP = true
if info.APIPkg == "" { info.APIPkg = apipkg }
} else {
srvMap[dir] = &srvInfo{Dir: dir, APIPkg: apipkg, Services: svcs, HasWS: hasWS}
srvMap[dir] = &srvInfo{Dir: dir, APIPkg: apipkg, Services: svcs, HasWS: hasWS, HasHTTP: true}
}
}
// Detect gRPC servers from grpc.go files
for _, f := range files {
if filepath.Base(f.Path) != "grpc.go" { continue }
segs := strings.Split(filepath.ToSlash(f.Path), "/")
if len(segs) < 3 || segs[0] != "cmd" { continue }
dir := segs[1]
if info, ok := srvMap[dir]; ok {
info.HasGRPC = true
}
}

if len(srvMap) == 0 {
return files, nil
}

// Filter out default example mains and http.go; we'll add our own mains.
// Filter out default example mains, http.go, and grpc.go; we'll add our own mains.
var out []*codegen.File
for _, f := range files {
base := filepath.Base(f.Path)
if strings.HasPrefix(f.Path, "cmd/") && (base == "main.go" || base == "http.go") {
if strings.HasPrefix(f.Path, "cmd/") && (base == "main.go" || base == "http.go" || base == "grpc.go") {
continue
}
out = append(out, f)
Expand Down Expand Up @@ -156,6 +173,14 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([
codegen.GoaNamedImport("http", "goahttp"),
{Path: "google.golang.org/grpc/credentials/insecure"},
}
if info.HasGRPC {
specs = append(specs,
&codegen.ImportSpec{Path: "net"},
&codegen.ImportSpec{Path: "google.golang.org/grpc"},
&codegen.ImportSpec{Path: "google.golang.org/grpc/reflection"},
&codegen.ImportSpec{Path: "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"},
)
}
if info.HasWS {
specs = append(specs, &codegen.ImportSpec{Path: "github.com/gorilla/websocket"})
}
Expand All @@ -164,19 +189,45 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([

scope := codegen.NewNameScope()
var svcsData []svcT
httpBySvc := httpServicesByName(roots)
grpcBySvc := grpcServicesByName(roots)
wsBySvc := httpWebSocketByService(roots)
hasAnyWS := false
hasAnyHTTP := false
hasAnyGRPC := false
for _, sd := range info.Services {
genAlias := scope.Unique(sd.PkgName, "svc")
httpAlias := scope.Unique(sd.PkgName+"svr", "svr")
specs = append(specs,
&codegen.ImportSpec{Path: path.Join(genpkg, sd.PathName), Name: genAlias},
&codegen.ImportSpec{Path: path.Join(genpkg, "http", sd.PathName, "server"), Name: httpAlias},
)
hasHTTP := httpBySvc[sd.Name]
hasGRPC := grpcBySvc[sd.Name]
hws := wsBySvc[sd.Name]

var httpAlias, grpcAlias, grpcPbAlias string

// Always add the base service package
specs = append(specs, &codegen.ImportSpec{Path: path.Join(genpkg, sd.PathName), Name: genAlias})

// Conditionally add HTTP server imports
if hasHTTP {
httpAlias = scope.Unique(sd.PkgName+"svr", "svr")
specs = append(specs, &codegen.ImportSpec{Path: path.Join(genpkg, "http", sd.PathName, "server"), Name: httpAlias})
hasAnyHTTP = true
}

// Conditionally add gRPC server imports
if hasGRPC {
grpcAlias = scope.Unique(sd.PkgName+"grpc", "grpcsvc")
grpcPbAlias = scope.Unique(sd.PkgName+"pb", "pb")
specs = append(specs,
&codegen.ImportSpec{Path: path.Join(genpkg, "grpc", sd.PathName, "server"), Name: grpcAlias},
&codegen.ImportSpec{Path: path.Join(genpkg, "grpc", sd.PathName, "pb"), Name: grpcPbAlias},
)
hasAnyGRPC = true
}

if hws {
hasAnyWS = true
}

svcsData = append(svcsData, svcT{
Name: sd.Name,
StructName: sd.StructName,
Expand All @@ -185,7 +236,11 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([
SrvVar: sd.VarName + "Server",
GenPkg: genAlias,
GenHTTPPkg: httpAlias,
GenGRPCPkg: grpcAlias,
GenGRPCPbPkg: grpcPbAlias,
HasWebSocket: hws,
HasHTTP: hasHTTP,
HasGRPC: hasGRPC,
})
}

Expand All @@ -195,6 +250,8 @@ func generateExample(genpkg string, roots []eval.Root, files []*codegen.File) ([
"APIPkg": info.APIPkg,
"Services": svcsData,
"HasAnyWebSocket": hasAnyWS,
"HasHTTP": hasAnyHTTP,
"HasGRPC": hasAnyGRPC,
"ServiceCount": len(svcsData),
"ServerLabel": serverLabel(roots),
}},
Expand Down Expand Up @@ -265,7 +322,8 @@ func httpWebSocketByService(roots []eval.Root) map[string]bool {
if e.SSE != nil {
continue
}
if e.MethodExpr != nil && e.MethodExpr.Stream != expr.NoStreamKind {
// Stream is 0 when no streaming is defined, and >= NoStreamKind (1) when streaming is used
if e.MethodExpr != nil && e.MethodExpr.Stream != 0 {
hasWS[svc.Name()] = true
break
}
Expand All @@ -286,3 +344,37 @@ func rootServer(roots []eval.Root) *expr.ServerExpr {
}
return nil
}

// httpServicesByName returns map of service names that have HTTP endpoints.
func httpServicesByName(roots []eval.Root) map[string]bool {
hasHTTP := map[string]bool{}
for _, r := range roots {
root, ok := r.(*expr.RootExpr)
if !ok || root.API == nil || root.API.HTTP == nil {
continue
}
for _, svc := range root.API.HTTP.Services {
if len(svc.HTTPEndpoints) > 0 {
hasHTTP[svc.Name()] = true
}
}
}
return hasHTTP
}

// grpcServicesByName returns map of service names that have gRPC endpoints.
func grpcServicesByName(roots []eval.Root) map[string]bool {
hasGRPC := map[string]bool{}
for _, r := range roots {
root, ok := r.(*expr.RootExpr)
if !ok || root.API == nil || root.API.GRPC == nil {
continue
}
for _, svc := range root.API.GRPC.Services {
if len(svc.GRPCEndpoints) > 0 {
hasGRPC[svc.Name()] = true
}
}
}
return hasGRPC
}
88 changes: 57 additions & 31 deletions mains/templates/main.go.tpl
Original file line number Diff line number Diff line change
@@ -1,35 +1,11 @@
package main

import (
"context"
"flag"
"fmt"
"net/http"
"net/http/httptrace"
"os"
"os/signal"
"sync"
"syscall"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"goa.design/clue/clue"
"goa.design/clue/debug"
"goa.design/clue/health"
"goa.design/clue/log"
goahttp "goa.design/goa/v3/http"
{{- if .HasAnyWebSocket }}
"github.com/gorilla/websocket"
{{- end }}
"google.golang.org/grpc/credentials/insecure"
)

func main() {
var (
{{- if .HasHTTP }}
httpaddr = flag.String("http-addr", ":8080", "HTTP listen address")
{{- end }}
{{- if .HasGRPC }}
grpcaddr = flag.String("grpc-addr", ":9090", "gRPC listen address")
{{- end }}
metricsAddr = flag.String("metrics-addr", ":8081", "metrics listen address")
coladdr = flag.String("otel-addr", ":4317", "OpenTelemetry collector listen address")
debugf = flag.Bool("debug", false, "Enable debug logs")
Expand Down Expand Up @@ -126,6 +102,7 @@ func main() {
{{ .EpVar }}.Use(log.Endpoint)
{{- end }}

{{- if .HasHTTP }}
// 6. Create HTTP transport
mux := goahttp.NewMuxer()
debug.MountDebugLogEnabler(debug.Adapt(mux))
Expand All @@ -139,6 +116,7 @@ func main() {
{{- end }}

{{- range .Services }}
{{- if .HasHTTP }}
// {{ .Name }} HTTP server
{{- if .HasWebSocket }}
{{ .SrvVar }} := {{ .GenHTTPPkg }}.New({{ .EpVar }}, mux, goahttp.RequestDecoder, goahttp.ResponseEncoder, nil, nil, upgrader, nil)
Expand All @@ -150,10 +128,37 @@ func main() {
log.Print(ctx, log.KV{K: "method", V: m.Method}, log.KV{K: "endpoint", V: m.Verb + " " + m.Pattern})
}
{{- end }}
{{- end }}

httpServer := &http.Server{Addr: *httpaddr, Handler: handler}
{{- end }}

{{- if .HasGRPC }}
// 6b. Create gRPC server with interceptors
var grpcServerOpts []grpc.ServerOption
grpcServerOpts = append(grpcServerOpts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
grpcServerOpts = append(grpcServerOpts, grpc.ChainUnaryInterceptor(
log.UnaryServerInterceptor(ctx),
debug.UnaryServerInterceptor(),
))
grpcServerOpts = append(grpcServerOpts, grpc.ChainStreamInterceptor(
log.StreamServerInterceptor(ctx),
debug.StreamServerInterceptor(),
))
grpcServer := grpc.NewServer(grpcServerOpts...)

{{- range .Services }}
{{- if .HasGRPC }}
// {{ .Name }} gRPC server
{{ .SvcVar }}GRPCServer := {{ .GenGRPCPkg }}.New({{ .EpVar }}, nil)
{{ .GenGRPCPbPkg }}.Register{{ .StructName }}Server(grpcServer, {{ .SvcVar }}GRPCServer)
{{- end }}
{{- end }}

// 7. Start HTTP servers (graceful shutdown)
reflection.Register(grpcServer)
{{- end }}

// 7. Start servers (graceful shutdown)
errc := make(chan error)
go func() {
c := make(chan os.Signal, 1)
Expand All @@ -167,18 +172,32 @@ func main() {
go func() {
defer wg.Done()

{{- if .HasHTTP }}
go func() {
log.Printf(ctx, "HTTP server listening on %s", *httpaddr)
errc <- httpServer.ListenAndServe()
}()
{{- end }}

{{- if .HasGRPC }}
go func() {
lis, err := net.Listen("tcp", *grpcaddr)
if err != nil {
errc <- err
return
}
log.Printf(ctx, "gRPC server listening on %s", *grpcaddr)
errc <- grpcServer.Serve(lis)
}()
{{- end }}

go func() {
log.Printf(ctx, "Metrics server listening on %s", *metricsAddr)
errc <- metricsServer.ListenAndServe()
}()

<-ctx.Done()
log.Printf(ctx, "shutting down HTTP servers")
log.Printf(ctx, "shutting down servers")

// Shutdown gracefully with a 30s timeout.
sctx, scancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -192,9 +211,16 @@ func main() {
}
{{- end }}

{{- if .HasHTTP }}
if err := httpServer.Shutdown(sctx); err != nil {
log.Errorf(sctx, err, "failed to shutdown HTTP server")
}
{{- end }}

{{- if .HasGRPC }}
grpcServer.GracefulStop()
{{- end }}

if err := metricsServer.Shutdown(sctx); err != nil {
log.Errorf(sctx, err, "failed to shutdown metrics server")
}
Expand Down
Loading