Skip to content

Commit

Permalink
Merge branch 'master' into feature/ConfigurableExclusiveRanges
Browse files Browse the repository at this point in the history
  • Loading branch information
HazWard committed Oct 15, 2020
2 parents cd1797a + 6f05e52 commit 6d35482
Show file tree
Hide file tree
Showing 10 changed files with 773 additions and 228 deletions.
42 changes: 8 additions & 34 deletions internal/app/backend/backend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,8 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac

// TODO: Send mmf error in FetchSummary instead of erroring call.
if syncErr != nil || mmfErr != nil {
logger.WithFields(logrus.Fields{
"syncErr": syncErr,
"mmfErr": mmfErr,
}).Error("error(s) in FetchMatches call.")

return fmt.Errorf(
"error(s) in FetchMatches call. syncErr=[%s], mmfErr=[%s]",
"error(s) in FetchMatches call. syncErr=[%v], mmfErr=[%v]",
syncErr,
mmfErr,
)
Expand Down Expand Up @@ -203,17 +198,13 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
var conn *grpc.ClientConn
conn, err := cc.GetGRPC(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish grpc client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
return status.Error(codes.InvalidArgument, "failed to establish grpc client connection to match function")
}
client := pb.NewMatchFunctionClient(conn)

stream, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithError(err).Error("failed to run match function for profile")
err = errors.Wrap(err, "failed to run match function for profile")
if ctx.Err() != nil {
// gRPC likes to suppress the context's error, so stop that.
return ctx.Err()
Expand All @@ -227,7 +218,7 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
break
}
if err != nil {
logger.Errorf("%v.Run() error, %v\n", client, err)
err = errors.Wrapf(err, "%v.Run() error, %v", client, err)
if ctx.Err() != nil {
// gRPC likes to suppress the context's error, so stop that.
return ctx.Err()
Expand All @@ -247,11 +238,8 @@ func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProfile, address string, proposals chan<- *pb.Match) error {
client, baseURL, err := cc.GetHTTP(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish rest client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
err = errors.Wrapf(err, "failed to establish rest client connection to match function: %s", address)
return status.Error(codes.InvalidArgument, err.Error())
}

var m jsonpb.Marshaler
Expand Down Expand Up @@ -308,9 +296,9 @@ func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
}

func (s *backendService) ReleaseTickets(ctx context.Context, req *pb.ReleaseTicketsRequest) (*pb.ReleaseTicketsResponse, error) {
err := doReleasetickets(ctx, req, s.store)
err := s.store.DeleteTicketsFromPendingRelease(ctx, req.GetTicketIds())
if err != nil {
logger.WithError(err).Error("failed to remove the awaiting tickets from the pending release for requested tickets")
err = errors.Wrap(err, "failed to remove the awaiting tickets from the pending release for requested tickets")
return nil, err
}

Expand All @@ -330,7 +318,6 @@ func (s *backendService) ReleaseAllTickets(ctx context.Context, req *pb.ReleaseA
func (s *backendService) AssignTickets(ctx context.Context, req *pb.AssignTicketsRequest) (*pb.AssignTicketsResponse, error) {
resp, err := doAssignTickets(ctx, req, s.store)
if err != nil {
logger.WithError(err).Error("failed to update assignments for requested tickets")
return nil, err
}

Expand All @@ -346,7 +333,6 @@ func (s *backendService) AssignTickets(ctx context.Context, req *pb.AssignTicket
func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store statestore.Service) (*pb.AssignTicketsResponse, error) {
resp, tickets, err := store.UpdateAssignments(ctx, req)
if err != nil {
logger.WithError(err).Error("failed to update assignments")
return nil, err
}

Expand Down Expand Up @@ -381,18 +367,6 @@ func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store st
return resp, nil
}

func doReleasetickets(ctx context.Context, req *pb.ReleaseTicketsRequest, store statestore.Service) error {
err := store.DeleteTicketsFromPendingRelease(ctx, req.GetTicketIds())
if err != nil {
logger.WithFields(logrus.Fields{
"ticket_ids": req.GetTicketIds(),
}).WithError(err).Error("failed to delete the tickets from the pending release list")
return err
}

return nil
}

func recordTimeToAssignment(ctx context.Context, ticket *pb.Ticket) error {
if ticket.Assignment == nil {
return fmt.Errorf("assignment for ticket %s is nil", ticket.Id)
Expand Down
14 changes: 2 additions & 12 deletions internal/app/evaluator/evaluator_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ import (
"context"
"io"

"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"golang.org/x/sync/errgroup"
"open-match.dev/open-match/pkg/pb"
)

var (
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "evaluator.harness.golang",
})
)

// Evaluator is the function signature for the Evaluator to be implemented by
// the user. The harness will pass the Matches to evaluate to the Evaluator
// and the Evaluator will return an accepted list of Matches.
Expand Down Expand Up @@ -95,8 +88,5 @@ func (s *evaluatorService) Evaluate(stream pb.Evaluator_EvaluateServer) error {
})

err := g.Wait()
if err != nil {
logger.WithError(err).Error("Error in evaluator.Evaluate")
}
return err
return errors.Wrap(err, "Error in evaluator.Evaluate")
}
28 changes: 1 addition & 27 deletions internal/app/frontend/frontend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,11 @@ func doCreateTicket(ctx context.Context, req *pb.CreateTicketRequest, store stat

err := store.CreateTicket(ctx, ticket)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"ticket": ticket,
}).Error("failed to create the ticket")
return nil, err
}

err = store.IndexTicket(ctx, ticket)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"ticket": ticket,
}).Error("failed to index the ticket")
return nil, err
}

Expand All @@ -118,10 +110,6 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er
// Deindex this Ticket to remove it from matchmaking pool.
err := store.DeindexTicket(ctx, id)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"id": id,
}).Error("failed to deindex the ticket")
return err
}

Expand Down Expand Up @@ -152,20 +140,7 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er

// GetTicket get the Ticket associated with the specified TicketId.
func (s *frontendService) GetTicket(ctx context.Context, req *pb.GetTicketRequest) (*pb.Ticket, error) {
return doGetTickets(ctx, req.GetTicketId(), s.store)
}

func doGetTickets(ctx context.Context, id string, store statestore.Service) (*pb.Ticket, error) {
ticket, err := store.GetTicket(ctx, id)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"id": id,
}).Error("failed to get the ticket")
return nil, err
}

return ticket, nil
return s.store.GetTicket(ctx, req.GetTicketId())
}

// WatchAssignments stream back Assignment of the specified TicketId if it is updated.
Expand Down Expand Up @@ -197,7 +172,6 @@ func doWatchAssignments(ctx context.Context, id string, sender func(*pb.Assignme

err := sender(currAssignment)
if err != nil {
logger.WithError(err).Error("failed to send Redis response to grpc server")
return status.Errorf(codes.Aborted, err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/frontend/frontend_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestDoGetTicket(t *testing.T) {

test.preAction(ctx, cancel, store)

ticket, err := doGetTickets(ctx, fakeTicket.GetId(), store)
ticket, err := store.GetTicket(ctx, fakeTicket.GetId())
require.Equal(t, test.wantCode.String(), status.Convert(err).Code().String())

if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/app/query/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer
}
})
if err != nil {
logger.WithError(err).Error("Failed to run request.")
err = errors.Wrap(err, "QueryTickets: failed to run request")
return err
}
stats.Record(ctx, ticketsPerQuery.M(int64(len(results))))
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseSer
}
})
if err != nil {
logger.WithError(err).Error("Failed to run request.")
err = errors.Wrap(err, "QueryTicketIds: failed to run request")
return err
}
stats.Record(ctx, ticketsPerQuery.M(int64(len(results))))
Expand Down
18 changes: 17 additions & 1 deletion internal/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type ServerParams struct {

// NewServerParamsFromConfig returns server Params initialized from the configuration file.
func NewServerParamsFromConfig(cfg config.View, prefix string, listen func(network, address string) (net.Listener, error)) (*ServerParams, error) {
serverLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": prefix,
})

grpcL, err := listen("tcp", fmt.Sprintf(":%d", cfg.GetInt(prefix+".grpcport")))
if err != nil {
return nil, errors.Wrap(err, "can't start listener for grpc")
Expand Down Expand Up @@ -283,7 +288,7 @@ func newGRPCServerOptions(params *ServerParams) []grpc.ServerOption {
}
}

// TODO: add unary interceptor once all redundant logs are removed
ui = append(ui, serverUnaryInterceptor)
si = append(si, serverStreamInterceptor)

if params.enableMetrics {
Expand Down Expand Up @@ -311,3 +316,14 @@ func serverStreamInterceptor(srv interface{},
}
return err
}

func serverUnaryInterceptor(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
h, err := handler(ctx, req)
if err != nil {
serverLogger.Error(err)
}
return h, err
}
31 changes: 17 additions & 14 deletions internal/statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"open-match.dev/open-match/pkg/pb"
)

const allTickets = "allTickets"
const (
allTickets = "allTickets"
proposedTicketIDs = "proposed_ticket_ids"
)

var (
redisLogger = logrus.WithFields(logrus.Fields{
Expand Down Expand Up @@ -255,7 +258,7 @@ func (rb *redisBackend) GetTicket(ctx context.Context, id string) (*pb.Ticket, e
if err != nil {
// Return NotFound if redigo did not find the ticket in storage.
if err == redis.ErrNil {
msg := fmt.Sprintf("Ticket id:%s not found", id)
msg := fmt.Sprintf("Ticket id: %s not found", id)
return nil, status.Error(codes.NotFound, msg)
}

Expand All @@ -264,7 +267,7 @@ func (rb *redisBackend) GetTicket(ctx context.Context, id string) (*pb.Ticket, e
}

if value == nil {
msg := fmt.Sprintf("Ticket id:%s not found", id)
msg := fmt.Sprintf("Ticket id: %s not found", id)
return nil, status.Error(codes.NotFound, msg)
}

Expand Down Expand Up @@ -343,7 +346,7 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
startTimeInt := curTime.Add(-ttl).UnixNano()

// Filter out tickets that are fetched but not assigned within ttl time (ms).
idsInPendingReleases, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", "proposed_ticket_ids", startTimeInt, endTimeInt))
idsInPendingReleases, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", proposedTicketIDs, startTimeInt, endTimeInt))
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting pending release %v", err)
}
Expand Down Expand Up @@ -413,6 +416,12 @@ func (rb *redisBackend) UpdateAssignments(ctx context.Context, req *pb.AssignTic
return resp, []*pb.Ticket{}, nil
}

redisConn, err := rb.redisPool.GetContext(ctx)
if err != nil {
return nil, nil, status.Errorf(codes.Unavailable, "UpdateAssignments, failed to connect to redis: %v", err)
}
defer handleConnectionClose(&redisConn)

idToA := make(map[string]*pb.Assignment)
ids := make([]string, 0)
idsI := make([]interface{}, 0)
Expand All @@ -423,7 +432,7 @@ func (rb *redisBackend) UpdateAssignments(ctx context.Context, req *pb.AssignTic

for _, id := range a.TicketIds {
if _, ok := idToA[id]; ok {
return nil, nil, status.Errorf(codes.InvalidArgument, "Ticket id %s is assigned multiple times in one assign tickets call.", id)
return nil, nil, status.Errorf(codes.InvalidArgument, "Ticket id %s is assigned multiple times in one assign tickets call", id)
}

idToA[id] = a.Assignment
Expand All @@ -432,12 +441,6 @@ func (rb *redisBackend) UpdateAssignments(ctx context.Context, req *pb.AssignTic
}
}

redisConn, err := rb.redisPool.GetContext(ctx)
if err != nil {
return nil, nil, status.Errorf(codes.Unavailable, "UpdateAssignments, failed to connect to redis: %v", err)
}
defer handleConnectionClose(&redisConn)

ticketBytes, err := redis.ByteSlices(redisConn.Do("MGET", idsI...))
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -557,7 +560,7 @@ func (rb *redisBackend) AddTicketsToPendingRelease(ctx context.Context, ids []st

currentTime := time.Now().UnixNano()
cmds := make([]interface{}, 0, 2*len(ids)+1)
cmds = append(cmds, "proposed_ticket_ids")
cmds = append(cmds, proposedTicketIDs)
for _, id := range ids {
cmds = append(cmds, currentTime, id)
}
Expand All @@ -584,7 +587,7 @@ func (rb *redisBackend) DeleteTicketsFromPendingRelease(ctx context.Context, ids
defer handleConnectionClose(&redisConn)

cmds := make([]interface{}, 0, len(ids)+1)
cmds = append(cmds, "proposed_ticket_ids")
cmds = append(cmds, proposedTicketIDs)
for _, id := range ids {
cmds = append(cmds, id)
}
Expand All @@ -605,7 +608,7 @@ func (rb *redisBackend) ReleaseAllTickets(ctx context.Context) error {
}
defer handleConnectionClose(&redisConn)

_, err = redisConn.Do("DEL", "proposed_ticket_ids")
_, err = redisConn.Do("DEL", proposedTicketIDs)
return err
}

Expand Down

0 comments on commit 6d35482

Please sign in to comment.