Skip to content

Commit

Permalink
Added sync.Map to backend and synchronizer (#1078)
Browse files Browse the repository at this point in the history
This is an intermediate step to resolve #939. Leaving a bunch of TODOs in this PR and will fix them after the proto change.
  • Loading branch information
yfei1 committed Jan 30, 2020
1 parent c92c4ef commit edade67
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
14 changes: 8 additions & 6 deletions internal/app/backend/backend_service.go
Expand Up @@ -74,19 +74,20 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
// Closed when mmfs should start.
startMmfs := make(chan struct{})
proposals := make(chan *pb.Match)
m := &sync.Map{}
wg := sync.WaitGroup{}
wg.Add(3)

var synchronizeSendErr error
go func() {
defer wg.Done()
synchronizeSendErr = synchronizeSend(mmfCtx, proposals, syncStream)
synchronizeSendErr = synchronizeSend(mmfCtx, m, proposals, syncStream)
}()

var synchronizeRecvErr error
go func() {
defer wg.Done()
synchronizeRecvErr = synchronizeRecv(syncStream, stream, startMmfs, cancelMmfs)
synchronizeRecvErr = synchronizeRecv(syncStream, m, stream, startMmfs, cancelMmfs)
cancelMmfs()
}()

Expand Down Expand Up @@ -125,7 +126,7 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
return nil
}

func synchronizeSend(ctx context.Context, proposals <-chan *pb.Match, syncStream synchronizerStream) error {
func synchronizeSend(ctx context.Context, m *sync.Map, proposals <-chan *pb.Match, syncStream synchronizerStream) error {
sendProposals:
for {
select {
Expand All @@ -135,6 +136,7 @@ sendProposals:
if !ok {
break sendProposals
}
m.Store(p.GetMatchId(), p)
telemetry.RecordUnitMeasurement(ctx, mMatchesSentToEvaluation)
err := syncStream.Send(&ipb.SynchronizeRequest{Proposal: p})
if err != nil {
Expand All @@ -150,7 +152,7 @@ sendProposals:
return nil
}

func synchronizeRecv(syncStream synchronizerStream, stream pb.BackendService_FetchMatchesServer, startMmfs chan<- struct{}, cancelMmfs context.CancelFunc) error {
func synchronizeRecv(syncStream synchronizerStream, m *sync.Map, stream pb.BackendService_FetchMatchesServer, startMmfs chan<- struct{}, cancelMmfs context.CancelFunc) error {
var startMmfsOnce sync.Once

for {
Expand All @@ -172,9 +174,9 @@ func synchronizeRecv(syncStream synchronizerStream, stream pb.BackendService_Fet
cancelMmfs()
}

if resp.Match != nil {
if match, ok := m.Load(resp.GetMatch().GetMatchId()); ok {
telemetry.RecordUnitMeasurement(stream.Context(), mMatchesFetched)
err = stream.Send(&pb.FetchMatchesResponse{Match: resp.Match})
err = stream.Send(&pb.FetchMatchesResponse{Match: match.(*pb.Match)})
if err != nil {
return fmt.Errorf("error sending match to caller of backend: %w", err)
}
Expand Down
39 changes: 34 additions & 5 deletions internal/app/synchronizer/synchronizer_service.go
Expand Up @@ -201,6 +201,9 @@ func (s *synchronizerService) runCycle() {

m2c := make(chan mAndM6c)
m3c := make(chan *pb.Match)
// TODO: Let's call it m3point5c for now, will do the renaming in a later PR.
m3point5c := make(chan *pb.Match)
// TODO: This will be a channel of string after the proto change, will do it in a later PR.
m4c := make(chan *pb.Match)
m5c := make(chan *pb.Match)

Expand All @@ -221,9 +224,12 @@ func (s *synchronizerService) runCycle() {
close(r.m6c)
}
}()
go s.wrapEvaluator(ctx, cancel, bufferChannel(m3c), m4c)

matchTickets := &sync.Map{}
go s.cacheMatchIDToTicketIDs(matchTickets, m3c, m3point5c)
go s.wrapEvaluator(ctx, cancel, bufferChannel(m3point5c), m4c)
go func() {
s.addMatchesToIgnoreList(ctx, cancel, bufferChannel(m4c), m5c)
s.addMatchesToIgnoreList(ctx, matchTickets, cancel, bufferChannel(m4c), m5c)
// Wait for ignore list, but not all matches returned, the next cycle
// can start now.
close(closedOnCycleEnd)
Expand Down Expand Up @@ -399,19 +405,42 @@ func (s *synchronizerService) wrapEvaluator(ctx context.Context, cancel cancelEr
///////////////////////////////////////
///////////////////////////////////////

func (s *synchronizerService) cacheMatchIDToTicketIDs(m *sync.Map, m3c <-chan *pb.Match, m3point5c chan<- *pb.Match) {
for match := range m3c {
m.Store(match.GetMatchId(), getTicketIds(match.GetTickets()))
m3point5c <- match
}
close(m3point5c)
}

func getTicketIds(tickets []*pb.Ticket) []string {
tids := []string{}
for _, ticket := range tickets {
tids = append(tids, ticket.GetId())
}
return tids
}

///////////////////////////////////////
///////////////////////////////////////

// Calls statestore to add all of the tickets returned by the evaluator to the
// ignorelist. If it partially fails for whatever reason (not all tickets will
// nessisarily be in the same call), only the matches which can be safely
// returned to the Synchronize calls are.
func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, cancel cancelErrFunc, m4c <-chan []*pb.Match, m5c chan<- *pb.Match) {
func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, m *sync.Map, cancel cancelErrFunc, m4c <-chan []*pb.Match, m5c chan<- *pb.Match) {
totalMatches := 0
successfulMatches := 0
var lastErr error
for matches := range m4c {
// For now assume m4c is a channel of MatchIds, will fix it in a later PR.
ids := []string{}
for _, match := range matches {
for _, ticket := range match.GetTickets() {
ids = append(ids, ticket.GetId())
tids, ok := m.Load(match.GetMatchId())
if ok {
ids = append(ids, tids.([]string)...)
} else {
logger.Errorf("failed to get MatchId %s with its corresponding tickets from the cache", match.GetMatchId())
}
}

Expand Down

0 comments on commit edade67

Please sign in to comment.