Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added sync.Map to backend and synchronizer #1078

Merged
merged 4 commits into from Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions internal/app/backend/backend_service.go
Expand Up @@ -74,19 +74,20 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
// Closed when mmfs should start.
startMmfs := make(chan struct{})
proposals := make(chan *pb.Match)
mc := &sync.Map{}
wg := sync.WaitGroup{}
wg.Add(3)

var synchronizeSendErr error
go func() {
defer wg.Done()
synchronizeSendErr = synchronizeSend(mmfCtx, proposals, syncStream)
synchronizeSendErr = synchronizeSend(mmfCtx, mc, proposals, syncStream)
}()

var synchronizeRecvErr error
go func() {
defer wg.Done()
synchronizeRecvErr = synchronizeRecv(syncStream, stream, startMmfs, cancelMmfs)
synchronizeRecvErr = synchronizeRecv(syncStream, mc, stream, startMmfs, cancelMmfs)
cancelMmfs()
}()

Expand Down Expand Up @@ -125,7 +126,7 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
return nil
}

func synchronizeSend(ctx context.Context, proposals <-chan *pb.Match, syncStream synchronizerStream) error {
func synchronizeSend(ctx context.Context, mc *sync.Map, proposals <-chan *pb.Match, syncStream synchronizerStream) error {
sendProposals:
for {
select {
Expand All @@ -135,6 +136,7 @@ sendProposals:
if !ok {
break sendProposals
}
mc.Store(p.GetMatchId(), p)
telemetry.RecordUnitMeasurement(ctx, mMatchesSentToEvaluation)
err := syncStream.Send(&ipb.SynchronizeRequest{Proposal: p})
if err != nil {
Expand All @@ -150,7 +152,7 @@ sendProposals:
return nil
}

func synchronizeRecv(syncStream synchronizerStream, stream pb.BackendService_FetchMatchesServer, startMmfs chan<- struct{}, cancelMmfs context.CancelFunc) error {
func synchronizeRecv(syncStream synchronizerStream, mc *sync.Map, stream pb.BackendService_FetchMatchesServer, startMmfs chan<- struct{}, cancelMmfs context.CancelFunc) error {
var startMmfsOnce sync.Once

for {
Expand All @@ -172,9 +174,9 @@ func synchronizeRecv(syncStream synchronizerStream, stream pb.BackendService_Fet
cancelMmfs()
}

if resp.Match != nil {
if match, ok := mc.Load(resp.GetMatch().GetMatchId()); ok {
telemetry.RecordUnitMeasurement(stream.Context(), mMatchesFetched)
err = stream.Send(&pb.FetchMatchesResponse{Match: resp.Match})
err = stream.Send(&pb.FetchMatchesResponse{Match: match.(*pb.Match)})
if err != nil {
return fmt.Errorf("error sending match to caller of backend: %w", err)
}
Expand Down
39 changes: 34 additions & 5 deletions internal/app/synchronizer/synchronizer_service.go
Expand Up @@ -201,6 +201,9 @@ func (s *synchronizerService) runCycle() {

m2c := make(chan mAndM6c)
m3c := make(chan *pb.Match)
// TODO: Let's call it m3point5c for now, will do the renaming in a later PR.
m3point5c := make(chan *pb.Match)
// TODO: This will be a channel of string after the proto change, will do it in a later PR.
m4c := make(chan *pb.Match)
m5c := make(chan *pb.Match)

Expand All @@ -221,9 +224,12 @@ func (s *synchronizerService) runCycle() {
close(r.m6c)
}
}()
go s.wrapEvaluator(ctx, cancel, bufferChannel(m3c), m4c)

matchTickets := &sync.Map{}
go s.cacheMatchIDToTicketIDs(matchTickets, m3c, m3point5c)
go s.wrapEvaluator(ctx, cancel, bufferChannel(m3point5c), m4c)
go func() {
s.addMatchesToIgnoreList(ctx, cancel, bufferChannel(m4c), m5c)
s.addMatchesToIgnoreList(ctx, matchTickets, cancel, bufferChannel(m4c), m5c)
// Wait for ignore list, but not all matches returned, the next cycle
// can start now.
close(closedOnCycleEnd)
Expand Down Expand Up @@ -399,19 +405,42 @@ func (s *synchronizerService) wrapEvaluator(ctx context.Context, cancel cancelEr
///////////////////////////////////////
///////////////////////////////////////

func (s *synchronizerService) cacheMatchIDToTicketIDs(m *sync.Map, m3c <-chan *pb.Match, m3point5c chan<- *pb.Match) {
for match := range m3c {
m.Store(match.GetMatchId(), getTicketIds(match.GetTickets()))
m3point5c <- match
}
close(m3point5c)
}

func getTicketIds(tickets []*pb.Ticket) []string {
tids := []string{}
for _, ticket := range tickets {
tids = append(tids, ticket.GetId())
}
return tids
}

///////////////////////////////////////
///////////////////////////////////////

// Calls statestore to add all of the tickets returned by the evaluator to the
// ignorelist. If it partially fails for whatever reason (not all tickets will
// nessisarily be in the same call), only the matches which can be safely
// returned to the Synchronize calls are.
func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, cancel cancelErrFunc, m4c <-chan []*pb.Match, m5c chan<- *pb.Match) {
func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, m *sync.Map, cancel cancelErrFunc, m4c <-chan []*pb.Match, m5c chan<- *pb.Match) {
totalMatches := 0
successfulMatches := 0
var lastErr error
for matches := range m4c {
// For now assume m4c is a channel of MatchIds, will fix it in a later PR.
ids := []string{}
for _, match := range matches {
for _, ticket := range match.GetTickets() {
ids = append(ids, ticket.GetId())
tids, ok := m.Load(match.GetMatchId())
if ok {
ids = append(ids, tids.([]string)...)
} else {
logger.Errorf("failed to get MatchId %s with its corresponding tickets from the cache", match.GetMatchId())
}
}

Expand Down