Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"flag"

"google.golang.org/grpc/credentials/insecure"

server "github.com/TylerJGabb/grpc-http-proxy/examples/server/impl"
Expand All @@ -10,7 +11,7 @@ import (

func main() {
exampleServer := server.ExampleServer{}
addr := exampleServer.Start(9091)
addr := exampleServer.Start(9092)

port := flag.Int("port", 8080, "the port number")
flag.Parse()
Expand Down
33 changes: 11 additions & 22 deletions examples/server/impl/intbidi.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package server

import (
"context"
"fmt"
"io"

"github.com/TylerJGabb/grpc-http-proxy/internal/logging"
"github.com/TylerJGabb/grpc-http-proxy/pkg/tgsbpb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -13,50 +10,42 @@ import (
func (s *ExampleServer) BidirectionalStreamInt(
stream tgsbpb.TylerSandboxService_BidirectionalStreamIntServer,
) error {
// when we return out of this method, the context of the stream will be canceled
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
logger := logging.NewLogger(logging.WithOperationId("server.BidirectionalStreamInt"))
logger.Info("stream starting")
in := make(chan *tgsbpb.BidirectionalStreamIntRequest)
errs := make(chan error, 1)
go func() {
defer logger.Info("server receive goroutine ended")
for {
// this is a blocking call, will unblock when stream context is canceled
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
cancel()
} else {
fmt.Printf("receive error: %s\n", err.Error())
errs <- err
}
logger.Info("error while receiving from stream", "err", err)
return
}
select {
case in <- msg:
case <-ctx.Done():
case <-stream.Context().Done():
return
}
}
}()
for {
select {
case err := <-errs:
return err
case <-ctx.Done():
case <-stream.Context().Done():
return nil
case req := <-in:
if req.Close {
cancel()
logger.Info("server is closing the stream intentionally")
return status.Error(codes.Canceled, "stream closed intentionally")
}
if req.RespondWithError {
logger.Info("server is intentionally returning error")
return status.Error(codes.Internal, "error returned intentionally")
}
send := &tgsbpb.BidirectionalStreamIntResponse{Value: req.Value}
// this is a blocking call, will unblock when stream context is canceled
logger.Info("sending response", "value", send.Value)
if err := stream.Send(send); err != nil {
fmt.Printf("send error: %s\n", err.Error())
return err
logger.Info("unexpected error while sending on stream", "err", err)
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion examples/server/impl/serverstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package server

import (
"fmt"
"log/slog"
"math"
"os"
"time"

"github.com/TylerJGabb/grpc-http-proxy/pkg/tgsbpb"
Expand All @@ -14,24 +16,31 @@ func (s *ExampleServer) ServerStreamInt(
req *tgsbpb.ServerStreamIntRequest,
stream tgsbpb.TylerSandboxService_ServerStreamIntServer,
) error {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})).With("method", "ServerStreamInt")
req.SendPeriodSeconds = uint32(math.Max(float64(req.SendPeriodSeconds), 1))
if req.SendErrorAtNthResponse == 0 {
logger.Info("SendErrorAtNthResponse is 0, will never send an error")
req.SendErrorAtNthResponse = math.MaxUint32
}
if req.CloseAtNthResponse == 0 {
logger.Info("CloseAtNthResponse is 0, will never close the stream")
req.CloseAtNthResponse = math.MaxUint32
}
var responseCount uint32 = 1
defer logger.Info("stream ended")
for {
time.Sleep(time.Second * time.Duration(req.SendPeriodSeconds))
if responseCount >= req.CloseAtNthResponse {
logger.Info("server is ending the stream")
return nil
}
if responseCount >= req.SendErrorAtNthResponse {
logger.Error("server is intentionally returning error")
return status.Error(codes.Internal, "intentionally returned error")
}
logger.Info("sending response", "responseCount", responseCount, "value", req.Value)
if err := stream.Send(&tgsbpb.ServerStreamIntResponse{Value: req.Value}); err != nil {
fmt.Println(err.Error())
logger.Info("unexpected error while sending on stream", "err", err)
return status.Error(codes.Internal, err.Error())
}
responseCount++
Expand Down
145 changes: 145 additions & 0 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package logging

import (
"io"
"log/slog"
"os"
)

// structuredLoggingEnvVar logging off by default
var structuredLoggingEnvVar bool = false

// loggingLevelEnvVar default logging level is info
var loggingLevelEnvVar slog.Level = slog.LevelInfo

func init() {
if os.Getenv("STRUCTURED_LOGGING") == "on" {
structuredLoggingEnvVar = true
}
if logLevel := os.Getenv("LOG_LEVEL"); logLevel != "" {
loggingLevelEnvVar.UnmarshalText([]byte(logLevel))
}
}

type LoggerOptions struct {
requestId string
operationId string
attributes map[string]string
output io.Writer
structured bool
logLevel slog.Level
}
type LoggerOptionFuction func(*LoggerOptions)

func WithAttribute(key, value string) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.attributes[key] = value
}
}

func WithRequestId(requestId string) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.requestId = requestId
}
}

func WithOperationId(operationId string) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.operationId = operationId
}
}

func WithOutput(w io.Writer) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.output = w
}
}

func WithStructuredLogging(enabled bool) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.structured = enabled
}
}

func WithLogLevel(level slog.Level) LoggerOptionFuction {
return func(l *LoggerOptions) {
l.logLevel = level
}
}

func (l *LoggerOptions) Attrs() []slog.Attr {
attrs := make([]slog.Attr, 0)
if l.requestId != "" {
attrs = append(attrs, slog.Attr{
Key: "requestId",
Value: slog.StringValue(l.requestId),
})
}
if l.operationId != "" {
attrs = append(attrs, slog.Attr{
Key: "operationId",
Value: slog.StringValue(l.operationId),
})
}
for attributeKey, attributeValue := range l.attributes {
attrs = append(attrs, slog.Attr{
Key: attributeKey,
Value: slog.StringValue(attributeValue),
})
}
return attrs
}

/*
NewLogger creates a new logger.
By default this logger will output to stdout, in un-structured text format.
If STRUCTURED_LOGGING environment variable is set to "on", output will be written in structured JSON format by deafult.
You can customize the logger by passing LoggerOptionFuction to this function, which will override the default any defaults.
*/
func NewLogger(optFuncs ...LoggerOptionFuction) *slog.Logger {
loggerOpts := &LoggerOptions{
output: os.Stdout,
structured: structuredLoggingEnvVar,
logLevel: loggingLevelEnvVar,
attributes: make(map[string]string),
}
for _, optFunc := range optFuncs {
optFunc(loggerOpts)
}
handler := buildHandler(loggerOpts)
return slog.New(handler)
}

func buildHandler(loggerOptions *LoggerOptions) slog.Handler {
handlerOpts := &slog.HandlerOptions{
Level: loggerOptions.logLevel,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == "level" {
return slog.Attr{
Key: "severity",
Value: a.Value,
}
}
if a.Key == "msg" {
return slog.Attr{
Key: "message",
Value: a.Value,
}
} else if a.Key == "time" {
// Convert time to UTC
return slog.Attr{
Key: "time",
Value: slog.TimeValue(a.Value.Time().UTC()),
}
}
return a
},
}
var handler slog.Handler
if loggerOptions.structured {
handler = slog.NewJSONHandler(loggerOptions.output, handlerOpts)
} else {
handler = slog.NewTextHandler(loggerOptions.output, handlerOpts)
}
return handler.WithAttrs(loggerOptions.Attrs())
}