Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added unary interceptor and removed extra logs #1255

Merged
merged 6 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 8 additions & 34 deletions internal/app/backend/backend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,8 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac

// TODO: Send mmf error in FetchSummary instead of erroring call.
if syncErr != nil || mmfErr != nil {
logger.WithFields(logrus.Fields{
"syncErr": syncErr,
"mmfErr": mmfErr,
}).Error("error(s) in FetchMatches call.")

return fmt.Errorf(
"error(s) in FetchMatches call. syncErr=[%s], mmfErr=[%s]",
"error(s) in FetchMatches call. syncErr=[%v], mmfErr=[%v]",
syncErr,
mmfErr,
)
Expand Down Expand Up @@ -203,17 +198,13 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
var conn *grpc.ClientConn
conn, err := cc.GetGRPC(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish grpc client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
return status.Error(codes.InvalidArgument, "failed to establish grpc client connection to match function")
}
client := pb.NewMatchFunctionClient(conn)

stream, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithError(err).Error("failed to run match function for profile")
err = errors.Wrap(err, "failed to run match function for profile")
if ctx.Err() != nil {
// gRPC likes to suppress the context's error, so stop that.
return ctx.Err()
Expand All @@ -227,7 +218,7 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
break
}
if err != nil {
logger.Errorf("%v.Run() error, %v\n", client, err)
err = errors.Wrapf(err, "%v.Run() error, %v", client, err)
if ctx.Err() != nil {
// gRPC likes to suppress the context's error, so stop that.
return ctx.Err()
Expand All @@ -247,11 +238,8 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProfile, address string, proposals chan<- *pb.Match) error {
client, baseURL, err := cc.GetHTTP(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish rest client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
err = errors.Wrapf(err, "failed to establish rest client connection to match function: %s", address)
return status.Error(codes.InvalidArgument, err.Error())
}

var m jsonpb.Marshaler
Expand Down Expand Up @@ -308,9 +296,9 @@ func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
}

func (s *backendService) ReleaseTickets(ctx context.Context, req *pb.ReleaseTicketsRequest) (*pb.ReleaseTicketsResponse, error) {
err := doReleasetickets(ctx, req, s.store)
err := s.store.DeleteTicketsFromPendingRelease(ctx, req.GetTicketIds())
if err != nil {
logger.WithError(err).Error("failed to remove the awaiting tickets from the pending release for requested tickets")
err = errors.Wrap(err, "failed to remove the awaiting tickets from the pending release for requested tickets")
return nil, err
}

Expand All @@ -330,7 +318,6 @@ func (s *backendService) ReleaseAllTickets(ctx context.Context, req *pb.ReleaseA
func (s *backendService) AssignTickets(ctx context.Context, req *pb.AssignTicketsRequest) (*pb.AssignTicketsResponse, error) {
resp, err := doAssignTickets(ctx, req, s.store)
if err != nil {
logger.WithError(err).Error("failed to update assignments for requested tickets")
return nil, err
}

Expand All @@ -346,7 +333,6 @@ func (s *backendService) AssignTickets(ctx context.Context, req *pb.AssignTicket
func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store statestore.Service) (*pb.AssignTicketsResponse, error) {
resp, tickets, err := store.UpdateAssignments(ctx, req)
if err != nil {
logger.WithError(err).Error("failed to update assignments")
return nil, err
}

Expand Down Expand Up @@ -381,18 +367,6 @@ func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store st
return resp, nil
}

func doReleasetickets(ctx context.Context, req *pb.ReleaseTicketsRequest, store statestore.Service) error {
err := store.DeleteTicketsFromPendingRelease(ctx, req.GetTicketIds())
if err != nil {
logger.WithFields(logrus.Fields{
"ticket_ids": req.GetTicketIds(),
}).WithError(err).Error("failed to delete the tickets from the pending release list")
return err
}

return nil
}

func recordTimeToAssignment(ctx context.Context, ticket *pb.Ticket) error {
if ticket.Assignment == nil {
return fmt.Errorf("assignment for ticket %s is nil", ticket.Id)
Expand Down
14 changes: 2 additions & 12 deletions internal/app/evaluator/evaluator_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ import (
"context"
"io"

"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"golang.org/x/sync/errgroup"
"open-match.dev/open-match/pkg/pb"
)

var (
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "evaluator.harness.golang",
})
)

// Evaluator is the function signature for the Evaluator to be implemented by
// the user. The harness will pass the Matches to evaluate to the Evaluator
// and the Evaluator will return an accepted list of Matches.
Expand Down Expand Up @@ -95,8 +88,5 @@ func (s *evaluatorService) Evaluate(stream pb.Evaluator_EvaluateServer) error {
})

err := g.Wait()
if err != nil {
logger.WithError(err).Error("Error in evaluator.Evaluate")
}
return err
return errors.Wrap(err, "Error in evaluator.Evaluate")
}
28 changes: 1 addition & 27 deletions internal/app/frontend/frontend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,11 @@ func doCreateTicket(ctx context.Context, req *pb.CreateTicketRequest, store stat

err := store.CreateTicket(ctx, ticket)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"ticket": ticket,
}).Error("failed to create the ticket")
return nil, err
}

err = store.IndexTicket(ctx, ticket)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"ticket": ticket,
}).Error("failed to index the ticket")
return nil, err
}

Expand All @@ -118,10 +110,6 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er
// Deindex this Ticket to remove it from matchmaking pool.
err := store.DeindexTicket(ctx, id)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"id": id,
}).Error("failed to deindex the ticket")
return err
}

Expand Down Expand Up @@ -152,20 +140,7 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er

// GetTicket get the Ticket associated with the specified TicketId.
func (s *frontendService) GetTicket(ctx context.Context, req *pb.GetTicketRequest) (*pb.Ticket, error) {
return doGetTickets(ctx, req.GetTicketId(), s.store)
}

func doGetTickets(ctx context.Context, id string, store statestore.Service) (*pb.Ticket, error) {
ticket, err := store.GetTicket(ctx, id)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"id": id,
}).Error("failed to get the ticket")
return nil, err
}

return ticket, nil
return s.store.GetTicket(ctx, req.GetTicketId())
}

// WatchAssignments stream back Assignment of the specified TicketId if it is updated.
Expand Down Expand Up @@ -197,7 +172,6 @@ func doWatchAssignments(ctx context.Context, id string, sender func(*pb.Assignme

err := sender(currAssignment)
if err != nil {
logger.WithError(err).Error("failed to send Redis response to grpc server")
return status.Errorf(codes.Aborted, err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/frontend/frontend_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestDoGetTicket(t *testing.T) {

test.preAction(ctx, cancel, store)

ticket, err := doGetTickets(ctx, fakeTicket.GetId(), store)
ticket, err := store.GetTicket(ctx, fakeTicket.GetId())
require.Equal(t, test.wantCode.String(), status.Convert(err).Code().String())

if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/app/query/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer
}
})
if err != nil {
logger.WithError(err).Error("Failed to run request.")
err = errors.Wrap(err, "QueryTickets: failed to run request")
return err
}
stats.Record(ctx, ticketsPerQuery.M(int64(len(results))))
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseSer
}
})
if err != nil {
logger.WithError(err).Error("Failed to run request.")
err = errors.Wrap(err, "QueryTicketIds: failed to run request")
return err
}
stats.Record(ctx, ticketsPerQuery.M(int64(len(results))))
Expand Down
18 changes: 17 additions & 1 deletion internal/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type ServerParams struct {

// NewServerParamsFromConfig returns server Params initialized from the configuration file.
func NewServerParamsFromConfig(cfg config.View, prefix string, listen func(network, address string) (net.Listener, error)) (*ServerParams, error) {
serverLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": prefix,
})

grpcL, err := listen("tcp", fmt.Sprintf(":%d", cfg.GetInt(prefix+".grpcport")))
if err != nil {
return nil, errors.Wrap(err, "can't start listener for grpc")
Expand Down Expand Up @@ -283,7 +288,7 @@ func newGRPCServerOptions(params *ServerParams) []grpc.ServerOption {
}
}

// TODO: add unary interceptor once all redundant logs are removed
ui = append(ui, serverUnaryInterceptor)
si = append(si, serverStreamInterceptor)

if params.enableMetrics {
Expand Down Expand Up @@ -311,3 +316,14 @@ func serverStreamInterceptor(srv interface{},
}
return err
}

func serverUnaryInterceptor(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
h, err := handler(ctx, req)
if err != nil {
serverLogger.Error(err)
}
return h, err
}
4 changes: 2 additions & 2 deletions internal/testing/e2e/ticket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestAssignTicketsInvalidArgument(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
_, err := om.Backend().AssignTickets(ctx, tt.req)
require.Equal(t, codes.InvalidArgument, status.Convert(err).Code())
require.Equal(t, codes.InvalidArgument.String(), status.Convert(err).Code().String())
require.Equal(t, tt.msg, status.Convert(err).Message())
})
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestCreateTicketErrors(t *testing.T) {
resp, err := om.Frontend().CreateTicket(ctx, tt.req)
require.Nil(t, resp)
s := status.Convert(err)
require.Equal(t, codes.InvalidArgument, s.Code())
require.Equal(t, codes.InvalidArgument.String(), s.Code().String())
require.Equal(t, s.Message(), tt.msg)
})
}
Expand Down