Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/handler/engagement/engagement.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,18 @@ func (h *handler) IndexMessages(c *gin.Context) {
)
return
}
l.Debugf("fetched channels: %v", channels)

tx, done := h.repo.NewTransaction()
allMessages := make([]*discordgo.Message, 0)
// TODO: parallelize the code as each channel can be processed singly
for _, channel := range channels {
// TODO: parallelize the code as each channel can be processed singly
l := l.AddField("channelID", channel.ID)
if channel.LastMessageID == "" {
l.Debugf("channel has no message", channel.ID)
continue
}

l := l.AddField("channelID", channel.ID)
cursorMessageID, err := h.store.EngagementsRollup.GetLastMessageID(tx.DB(), channel.ID)
if err != nil {
l.Error(done(err), "get cursor message id error")
Expand All @@ -322,13 +324,10 @@ func (h *handler) IndexMessages(c *gin.Context) {
)
if err != nil {
l := l.AddField("cursorMessageID", cursorMessageID)
l.Error(err, "get messages after cursor error")
c.JSON(
http.StatusInternalServerError,
view.CreateResponse[any](nil, nil, err, nil, ""),
)
return
l.Warnf("get messages after cursor error: %s", err.Error())
continue
}
l.Debugf("fetched %d message(s)", len(messages))
allMessages = append(allMessages, messages...)
}

Expand All @@ -338,6 +337,7 @@ func (h *handler) IndexMessages(c *gin.Context) {
}

records := AggregateMessages(l, allMessages, channelIDToCategoryID)
l.Debugf("aggregated %d message(s) to %d records", len(allMessages), len(records))
for _, record := range records {
_, err := h.store.EngagementsRollup.Upsert(tx.DB(), record)
if err != nil {
Expand Down