Skip to content

Commit

Permalink
Update Scale package to sync with the latest config and API changes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yfei1 committed Nov 20, 2019
1 parent a75833b commit 8554601
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 277 deletions.
23 changes: 0 additions & 23 deletions cmd/scale-frontend/main.go

This file was deleted.

10 changes: 5 additions & 5 deletions examples/functions/golang/rosterbased/mmf/matchfunction.go
Expand Up @@ -67,7 +67,7 @@ func makeMatches(p *pb.MatchProfile, poolTickets map[string][]*pb.Ticket) ([]*pb
// populated roster specifying the empty slots for each pool name and also
// have the ticket pools referenced in the roster. It generates matches by
// populating players from the specified pools into rosters.
wantTickets, err := wantPoolTickets(p.Rosters)
wantTickets, err := wantPoolTickets(p.GetRosters())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,16 +133,16 @@ func makeMatches(p *pb.MatchProfile, poolTickets map[string][]*pb.Ticket) ([]*pb
func wantPoolTickets(rosters []*pb.Roster) (map[string]int, error) {
wantTickets := make(map[string]int)
for _, r := range rosters {
if _, ok := wantTickets[r.Name]; ok {
if _, ok := wantTickets[r.GetName()]; ok {
// We do not expect multiple Roster Pools to have the same name.
logger.Errorf("multiple rosters with same name not supported")
return nil, status.Error(codes.InvalidArgument, "multiple rosters with same name not supported")
}

wantTickets[r.Name] = 0
for _, slot := range r.TicketIds {
wantTickets[r.GetName()] = 0
for _, slot := range r.GetTicketIds() {
if slot == emptyRosterSpot {
wantTickets[r.Name] = wantTickets[r.Name] + 1
wantTickets[r.GetName()] = wantTickets[r.GetName()] + 1
}
}
}
Expand Down
43 changes: 41 additions & 2 deletions examples/functions/golang/rosterbased/mmf/server.go
Expand Up @@ -17,25 +17,64 @@ package mmf
import (
"fmt"
"net"
"time"

grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"open-match.dev/open-match/pkg/pb"
)

// matchFunctionService implements pb.MatchFunctionServer, the server generated
func init() {
// Using gRPC's DNS resolver to create clients.
// This is a workaround for load balancing gRPC applications under k8s environments.
// See https://kubernetes.io/blog/2018/11/07/grpc-load-balancing-on-kubernetes-without-tears/ for more details.
// https://godoc.org/google.golang.org/grpc/resolver#SetDefaultScheme
resolver.SetDefaultScheme("dns")
}

// MatchFunctionService implements pb.MatchFunctionServer, the server generated
// by compiling the protobuf, by fulfilling the pb.MatchFunctionServer interface.
type MatchFunctionService struct {
grpc *grpc.Server
mmlogicClient pb.MmLogicClient
port int
}

func newGRPCDialOptions() []grpc.DialOption {
grpcLogger := logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "grpc.client",
})
grpcLogger.Level = logrus.DebugLevel

si := []grpc.StreamClientInterceptor{grpc_logrus.StreamClientInterceptor(grpcLogger)}
ui := []grpc.UnaryClientInterceptor{grpc_logrus.UnaryClientInterceptor(grpcLogger)}
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(si...)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(ui...)),
grpc.WithBalancerName(roundrobin.Name),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}),
}

return opts
}

// Start creates and starts the Match Function server and also connects to Open
// Match's mmlogic service. This connection is used at runtime to fetch tickets
// for pools specified in MatchProfile.
func Start(mmlogicAddr string, serverPort int) error {
conn, err := grpc.Dial(mmlogicAddr, grpc.WithInsecure())
conn, err := grpc.Dial(mmlogicAddr, newGRPCDialOptions()...)
if err != nil {
logger.Fatalf("Failed to connect to Open Match, got %v", err)
}
Expand Down
159 changes: 118 additions & 41 deletions examples/scale/backend/backend.go
Expand Up @@ -24,7 +24,11 @@ import (
"time"

"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"open-match.dev/open-match/examples/scale/profiles"
"open-match.dev/open-match/examples/scale/tickets"
"open-match.dev/open-match/internal/config"
"open-match.dev/open-match/internal/logging"
"open-match.dev/open-match/internal/rpc"
Expand All @@ -37,10 +41,10 @@ var (
"component": "scale.backend",
})

// TODO: Add metrics to track matches created, tickets assigned, deleted.
matchCount uint64
assigned uint64
deleted uint64
failed uint64
)

// Run triggers execution of functions that continuously fetch, assign and
Expand Down Expand Up @@ -70,70 +74,134 @@ func Run() {
defer feConn.Close()
fe := pb.NewFrontendClient(feConn)

// The buffered channels attempt to decouple fetch, assign and delete. It is
// best effort and these operations may still block each other if buffers are full.
matches := make(chan *pb.Match, 1000)
deleteIds := make(chan string, 1000)
for i := 0; i < 10; i++ {
errMap := &sync.Map{}

go doFetch(cfg, be, matches)
go doAssign(be, matches, deleteIds)
go doDelete(fe, deleteIds)
doCreate(fe, errMap)
doFetch(cfg, be, errMap)

// The above goroutines run forever and so the main goroutine needs to block.
errMap.Range(func(k interface{}, v interface{}) bool {
logger.Infof("Got error %s: %#v", k, v)
return true
})
logger.Infof("%d round completes\n", i)
time.Sleep(time.Second * 5)
}
select {}

}

// doFetch continuously fetches all profiles in a loop and queues up the fetched
// matches for assignment.
func doFetch(cfg config.View, be pb.BackendClient, matches chan *pb.Match) {
startTime := time.Now()
mprofiles := profiles.Generate(cfg)
for {
func doCreate(fe pb.FrontendClient, errMap *sync.Map) {
var created uint64
var failed uint64
start := time.Now()
for created < 5000 {
var wg sync.WaitGroup
for _, p := range mprofiles {
for i := 0; i < 500; i++ {
wg.Add(1)
p := p
go func(wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup, i int) {
defer wg.Done()
fetch(be, p, matches)
}(&wg)
req := &pb.CreateTicketRequest{
Ticket: tickets.Ticket(),
}

ctx, span := trace.StartSpan(context.Background(), "scale.backend/CreateTicket")
defer span.End()

_, err := fe.CreateTicket(ctx, req)
if err != nil {
errMsg := fmt.Sprintf("failed to create a ticket: %w", err)
errRead, ok := errMap.Load(errMsg)
if !ok {
errRead = 0
}
errMap.Store(errMsg, errRead.(int)+1)
atomic.AddUint64(&failed, 1)
}
atomic.AddUint64(&created, 1)
}(&wg, i)
}

// Wait for all FetchMatches calls to complete before proceeding.
// Wait for all concurrent creates to complete.
wg.Wait()
logger.Infof("FetchedMatches:%v, AssignedTickets:%v, DeletedTickets:%v in time %v", atomic.LoadUint64(&matchCount), atomic.LoadUint64(&assigned), atomic.LoadUint64(&deleted), time.Since(startTime))
}
logger.Infof("%v tickets created, %v failed in %v", created, failed, time.Since(start))
}

func fetch(be pb.BackendClient, p *pb.MatchProfile, matches chan *pb.Match) {
req := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{
Host: "om-function",
Port: 50502,
Type: pb.FunctionConfig_GRPC,
},
Profiles: []*pb.MatchProfile{p},
// doFetch continuously fetches all profiles in a loop and queues up the fetched
// matches for assignment.
func doFetch(cfg config.View, be pb.BackendClient, errMap *sync.Map) {
startTime := time.Now()
mprofiles := profiles.Generate(cfg)
time.Sleep(time.Second * 1)
var wg sync.WaitGroup
for i, p := range mprofiles {
wg.Add(1)
p := p
go func(wg *sync.WaitGroup, i int) {
defer wg.Done()
fetch(be, p, i, errMap)
}(&wg, i)
}

stream, err := be.FetchMatches(context.Background(), req)
if err != nil {
logger.Errorf("FetchMatches failed, got %v", err)
return
}
// Wait for all FetchMatches calls to complete before proceeding.
wg.Wait()
logger.Infof(
"FetchedMatches:%v, AssignedTickets:%v, DeletedTickets:%v in time %v, Total profiles: %v, Failures: %v",
atomic.LoadUint64(&matchCount),
atomic.LoadUint64(&assigned),
atomic.LoadUint64(&deleted),
time.Since(startTime).Milliseconds(),
len(mprofiles),
atomic.LoadUint64(&failed),
)
}

func fetch(be pb.BackendClient, p *pb.MatchProfile, i int, errMap *sync.Map) {
for {
resp, err := stream.Recv()
if err == io.EOF {
return
req := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{
Host: "om-function",
Port: 50502,
Type: pb.FunctionConfig_GRPC,
},
Profiles: []*pb.MatchProfile{p},
}

ctx, span := trace.StartSpan(context.Background(), "scale.backend/FetchMatches")
defer span.End()

stream, err := be.FetchMatches(ctx, req)
if err != nil {
logger.Errorf("FetchMatches failed, got %v", err)
errMsg := fmt.Sprintf("failed to get available stream client: %w", err)
errRead, ok := errMap.Load(errMsg)
if !ok {
errRead = 0
}
errMap.Store(errMsg, errRead.(int)+1)
atomic.AddUint64(&failed, 1)
return
}

matches <- resp.GetMatch()
atomic.AddUint64(&matchCount, 1)
for {
_, err := stream.Recv()
if err == io.EOF {
return
}

if err != nil {
errMsg := fmt.Sprintf("failed to stream in the halfway: %w", err)
errRead, ok := errMap.Load(errMsg)
if !ok {
errRead = 0
}
errMap.Store(errMsg, errRead.(int)+1)
atomic.AddUint64(&failed, 1)
return
}

atomic.AddUint64(&matchCount, 1)
}
}
}

Expand Down Expand Up @@ -168,6 +236,7 @@ func doAssign(be pb.BackendClient, matches chan *pb.Match, deleteIds chan string

// doDelete deletes all the tickets whose ids get added to the deleteIds channel.
func doDelete(fe pb.FrontendClient, deleteIds chan string) {
logger.Infof("Starts doDelete, deleteIds len is: %d", len(deleteIds))
for id := range deleteIds {
req := &pb.DeleteTicketRequest{
TicketId: id,
Expand All @@ -181,3 +250,11 @@ func doDelete(fe pb.FrontendClient, deleteIds chan string) {
atomic.AddUint64(&deleted, 1)
}
}

func unavailable(err error) bool {
if status.Convert(err).Code() == codes.Unavailable {
return true
}

return false
}

0 comments on commit 8554601

Please sign in to comment.