Skip to content

Commit

Permalink
Fix concurrency issue on updating user last_indexed_at
Browse files Browse the repository at this point in the history
ref DEV-1270
  • Loading branch information
louischan-oursky committed May 8, 2024
2 parents e70fbc3 + 1c4a6f5 commit 44119ba
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/authgear/elasticsearch/deps.go
Expand Up @@ -37,6 +37,7 @@ var DependencySet = wire.NewSet(
config.NewDefaultDatabaseEnvironmentConfig,
NewGlobalDatabaseCredentials,
NewEmptyIdentityConfig,
NewReindexedTimestamps,
globaldb.DependencySet,
appdb.NewHandle,
appdb.DependencySet,
Expand Down
79 changes: 66 additions & 13 deletions cmd/authgear/elasticsearch/reindex.go
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"log"
"os"
"sync"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
Expand Down Expand Up @@ -43,14 +45,59 @@ type Item struct {
Cursor model.PageCursor
}

type ReindexedTimestamp struct {
UserID string
ReindexedAt time.Time
}

type ReindexedTimestamps struct {
timestamps []*ReindexedTimestamp
mutex sync.Mutex
}

func NewReindexedTimestamps() *ReindexedTimestamps {
return &ReindexedTimestamps{
timestamps: []*ReindexedTimestamp{},
mutex: sync.Mutex{},
}
}

func (r *ReindexedTimestamps) Append(userID string, timestamp time.Time) {
r.mutex.Lock()
t := &ReindexedTimestamp{
UserID: userID,
ReindexedAt: timestamp,
}
r.timestamps = append(r.timestamps, t)
r.mutex.Unlock()
}

func (r *ReindexedTimestamps) Flush(
dbHandle *appdb.Handle,
userStore *user.Store) error {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, t := range r.timestamps {
err := dbHandle.WithTx(func() error {
return userStore.UpdateLastIndexedAt([]string{t.UserID}, t.ReindexedAt)
})
if err != nil {
return err
}
}
r.timestamps = []*ReindexedTimestamp{}
return nil
}

type Reindexer struct {
Clock clock.Clock
Handle *appdb.Handle
AppID config.AppID
Users *user.Store
OAuth *identityoauth.Store
LoginID *identityloginid.Store
RolesGroups *rolesgroups.Store
Clock clock.Clock
Handle *appdb.Handle
AppID config.AppID
Users *user.Store
OAuth *identityoauth.Store
LoginID *identityloginid.Store
RolesGroups *rolesgroups.Store
ReindexedTimestamps *ReindexedTimestamps
}

func (q *Reindexer) QueryPage(after model.PageCursor, first uint64) ([]Item, error) {
Expand Down Expand Up @@ -159,6 +206,12 @@ func (q *Reindexer) Reindex(es *elasticsearch.Client) (err error) {
return err
}

// Flush timestamps once after closed bulkindexer to ensure all rows are updated
err = q.ReindexedTimestamps.Flush(q.Handle, q.Users)
if err != nil {
return
}

stats := bulkIndexer.Stats()
log.Printf("App (%v): %v indexed; %v deleted; %v failed\n", q.AppID, stats.NumIndexed, stats.NumDeleted, stats.NumFailed)
return nil
Expand Down Expand Up @@ -226,19 +279,19 @@ func (q *Reindexer) reindex(ctx context.Context, bulkIndexer esutil.BulkIndexer)
ctx context.Context,
item esutil.BulkIndexerItem,
res esutil.BulkIndexerResponseItem) {
err := q.Handle.WithTx(func() error {
return q.Users.UpdateLastIndexedAt([]string{source.ID}, startAt)
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to update last_indexed_at of user %s. %v\n", source.ID, err)
}
q.ReindexedTimestamps.Append(source.ID, startAt)
},
},
)
if err != nil {
return
}
}

err = q.ReindexedTimestamps.Flush(q.Handle, q.Users)
if err != nil {
return
}
}

return allUserIDs, nil
Expand Down
16 changes: 9 additions & 7 deletions cmd/authgear/elasticsearch/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 44119ba

Please sign in to comment.