Skip to content

Commit

Permalink
fix purge Redis and load feedback from MongoDB (#571)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Oct 17, 2022
1 parent 5154376 commit 0ce9ff6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
27 changes: 13 additions & 14 deletions storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,22 +253,21 @@ func (r *Redis) purge(ctx context.Context, client redis.UniversalClient, isClust
if err != nil {
return errors.Trace(err)
}
if len(result) == 0 {
return nil
}
if isCluster {
p := client.Pipeline()
for _, key := range result {
if err = p.Del(ctx, key).Err(); err != nil {
if len(result) > 0 {
if isCluster {
p := client.Pipeline()
for _, key := range result {
if err = p.Del(ctx, key).Err(); err != nil {
return errors.Trace(err)
}
}
if _, err = p.Exec(ctx); err != nil {
return errors.Trace(err)
}
} else {
if err = client.Del(ctx, result...).Err(); err != nil {
return errors.Trace(err)
}
}
if _, err = p.Exec(ctx); err != nil {
return errors.Trace(err)
}
} else {
if err = client.Del(ctx, result...).Err(); err != nil {
return errors.Trace(err)
}
}
if cursor == 0 {
Expand Down
33 changes: 19 additions & 14 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func getItemStream(t *testing.T, db Database, batchSize int) []Item {
return items
}

func getFeedback(t *testing.T, db Database, batchSize int, feedbackTypes ...string) []Feedback {
func getFeedback(t *testing.T, db Database, batchSize int, timeLimit *time.Time, feedbackTypes ...string) []Feedback {
feedback := make([]Feedback, 0)
var err error
var data []Feedback
cursor := ""
for {
cursor, data, err = db.GetFeedback(cursor, batchSize, nil, feedbackTypes...)
cursor, data, err = db.GetFeedback(cursor, batchSize, timeLimit, feedbackTypes...)
assert.NoError(t, err)
feedback = append(feedback, data...)
if cursor == "" {
Expand All @@ -118,9 +118,9 @@ func getFeedback(t *testing.T, db Database, batchSize int, feedbackTypes ...stri
}
}

func getFeedbackStream(t *testing.T, db Database, batchSize int, feedbackTypes ...string) []Feedback {
func getFeedbackStream(t *testing.T, db Database, batchSize int, timeLimit *time.Time, feedbackTypes ...string) []Feedback {
var feedbacks []Feedback
feedbackChan, errChan := db.GetFeedbackStream(batchSize, nil, feedbackTypes...)
feedbackChan, errChan := db.GetFeedbackStream(batchSize, timeLimit, feedbackTypes...)
for batchFeedback := range feedbackChan {
feedbacks = append(feedbacks, batchFeedback...)
}
Expand Down Expand Up @@ -200,12 +200,13 @@ func testFeedback(t *testing.T, db Database) {
err = db.BatchInsertItems([]Item{{ItemId: "0", Labels: []string{"b"}, Timestamp: time.Date(1996, 4, 8, 10, 0, 0, 0, time.UTC)}})
assert.NoError(t, err)
// insert feedbacks
timestamp := time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)
feedback := []Feedback{
{FeedbackKey{positiveFeedbackType, "0", "8"}, time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC), "comment"},
{FeedbackKey{positiveFeedbackType, "1", "6"}, time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC), "comment"},
{FeedbackKey{positiveFeedbackType, "2", "4"}, time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC), "comment"},
{FeedbackKey{positiveFeedbackType, "3", "2"}, time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC), "comment"},
{FeedbackKey{positiveFeedbackType, "4", "0"}, time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC), "comment"},
{FeedbackKey{positiveFeedbackType, "0", "8"}, timestamp, "comment"},
{FeedbackKey{positiveFeedbackType, "1", "6"}, timestamp, "comment"},
{FeedbackKey{positiveFeedbackType, "2", "4"}, timestamp, "comment"},
{FeedbackKey{positiveFeedbackType, "3", "2"}, timestamp, "comment"},
{FeedbackKey{positiveFeedbackType, "4", "0"}, timestamp, "comment"},
}
err = db.BatchInsertFeedback(feedback, true, true, true)
assert.NoError(t, err)
Expand All @@ -225,15 +226,19 @@ func testFeedback(t *testing.T, db Database) {
err = db.BatchInsertFeedback(futureFeedback, true, true, true)
assert.NoError(t, err)
// Get feedback
ret := getFeedback(t, db, 3, positiveFeedbackType)
ret := getFeedback(t, db, 3, nil, positiveFeedbackType)
assert.Equal(t, feedback, ret)
ret = getFeedback(t, db, 2)
ret = getFeedback(t, db, 2, nil)
assert.Equal(t, len(feedback)+2, len(ret))
ret = getFeedback(t, db, 2, lo.ToPtr(timestamp.Add(time.Second)))
assert.Empty(t, ret)
// Get feedback stream
feedbackFromStream := getFeedbackStream(t, db, 3, positiveFeedbackType)
feedbackFromStream := getFeedbackStream(t, db, 3, nil, positiveFeedbackType)
assert.ElementsMatch(t, feedback, feedbackFromStream)
feedbackFromStream = getFeedbackStream(t, db, 3)
feedbackFromStream = getFeedbackStream(t, db, 3, nil)
assert.Equal(t, len(feedback)+2, len(feedbackFromStream))
feedbackFromStream = getFeedbackStream(t, db, 3, lo.ToPtr(timestamp.Add(time.Second)))
assert.Empty(t, feedbackFromStream)
// Get items
err = db.Optimize()
assert.NoError(t, err)
Expand Down Expand Up @@ -628,7 +633,7 @@ func testTimeZone(t *testing.T, db Database) {
}, true, true, true)
assert.NoError(t, err)
// get feedback stream
feedback := getFeedback(t, db, 10)
feedback := getFeedback(t, db, 10, nil)
assert.Equal(t, 3, len(feedback))
// get feedback
_, feedback, err = db.GetFeedback("", 10, nil)
Expand Down
10 changes: 6 additions & 4 deletions storage/data/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ func (db *MongoDB) GetFeedback(cursor string, n int, timeLimit *time.Time, feedb
opt.SetLimit(int64(n))
opt.SetSort(bson.D{{"feedbackkey", 1}})
filter := make(bson.M)
filter["timestamp"] = bson.M{"$lte": time.Now()}
// pass cursor to filter
if cursor != "" {
feedbackKey, err := feedbackKeyFromString(cursor)
Expand All @@ -625,9 +624,11 @@ func (db *MongoDB) GetFeedback(cursor string, n int, timeLimit *time.Time, feedb
filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
}
// pass time limit to filter
timestampConditions := bson.M{"$lte": time.Now()}
if timeLimit != nil {
filter["timestamp"] = bson.M{"$gt": *timeLimit}
timestampConditions["$gt"] = *timeLimit
}
filter["timestamp"] = timestampConditions
r, err := c.Find(ctx, filter, opt)
if err != nil {
return "", nil, err
Expand Down Expand Up @@ -664,15 +665,16 @@ func (db *MongoDB) GetFeedbackStream(batchSize int, timeLimit *time.Time, feedba
c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
opt := options.Find()
filter := make(bson.M)
filter["timestamp"] = bson.M{"$lte": time.Now()}
// pass feedback type to filter
if len(feedbackTypes) > 0 {
filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
}
// pass time limit to filter
timestampConditions := bson.M{"$lte": time.Now()}
if timeLimit != nil {
filter["timestamp"] = bson.M{"$gt": *timeLimit}
timestampConditions["$gt"] = *timeLimit
}
filter["timestamp"] = timestampConditions
r, err := c.Find(ctx, filter, opt)
if err != nil {
errChan <- errors.Trace(err)
Expand Down

0 comments on commit 0ce9ff6

Please sign in to comment.