Skip to content

Commit

Permalink
Add support for guild chunking and chunking on startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
TheRockettek committed Jan 29, 2024
1 parent b081686 commit 6d7e9ae
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
loggingCompress := flag.Bool("compress", MustParseBool(os.Getenv("LOGGING_COMPRESS")), "If true, will compress log files once reached max size")
loggingDirectory := flag.String("directory", os.Getenv("LOGGING_DIRECTORY"), "Directory to store logs in")
loggingFilename := flag.String("filename", os.Getenv("LOGGING_FILENAME"), "Filename to store logs as")
loggingMaxSize := flag.Int("maxSize", MustParseInt(os.Getenv("LOGGING_MAX_SIZE")), "Maximum size for log files before being split into seperate files")
loggingMaxSize := flag.Int("maxSize", MustParseInt(os.Getenv("LOGGING_MAX_SIZE")), "Maximum size for log files before being split into separate files")
loggingMaxBackups := flag.Int("maxBackups", MustParseInt(os.Getenv("LOGGING_MAX_BACKUPS")), "Maximum number of log files before being deleted")
loggingMaxAge := flag.Int("maxAge", MustParseInt(os.Getenv("LOGGING_MAX_AGE")), "Maximum age in days for a log file")

Expand Down
46 changes: 39 additions & 7 deletions internal/events_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ func OnReady(ctx *StateCtx, msg discord.GatewayPayload, trace sandwich_structs.S

var readyPayload discord.Ready

var guildCreatePayload discord.GuildCreate

err = ctx.decodeContent(msg, &readyPayload)
if err != nil {
return result, false, err
Expand Down Expand Up @@ -67,11 +65,6 @@ ready:
if msg.Type == discord.DiscordEventGuildCreate {
guildCreateEvents++

err = ctx.decodeContent(msg, &guildCreatePayload)
if err != nil {
ctx.Logger.Error().Err(err).Str("type", msg.Type).Msg("Failed to decode event")
}

readyTimeout.Reset(ReadyTimeout)
}

Expand All @@ -93,6 +86,14 @@ ready:

ctx.SetStatus(sandwich_structs.ShardStatusReady)

ctx.Manager.configurationMu.RLock()
chunkGuildOnStartup := ctx.Manager.Configuration.Bot.ChunkGuildsOnStartup
ctx.Manager.configurationMu.RUnlock()

if chunkGuildOnStartup {
ctx.Shard.ChunkAllGuilds()
}

return result, false, nil
}

Expand Down Expand Up @@ -202,6 +203,10 @@ func OnGuildMembersChunk(ctx *StateCtx, msg discord.GatewayPayload, trace sandwi
return result, false, err
}

// Force caching of users and members.
ctx.CacheUsers = true
ctx.CacheMembers = true

for _, member := range guildMembersChunkPayload.Members {
ctx.Sandwich.State.SetGuildMember(ctx, guildMembersChunkPayload.GuildID, member)
}
Expand All @@ -213,6 +218,33 @@ func OnGuildMembersChunk(ctx *StateCtx, msg discord.GatewayPayload, trace sandwi
Int64("guildID", int64(guildMembersChunkPayload.GuildID)).
Msg("Chunked guild members")

ctx.Sandwich.guildChunksMu.RLock()
guildChunk, ok := ctx.Sandwich.guildChunks[guildMembersChunkPayload.GuildID]
ctx.Sandwich.guildChunksMu.RUnlock()

if !ok {
ctx.Logger.Warn().
Int64("guildID", int64(guildMembersChunkPayload.GuildID)).
Msg("Received guild member chunk, but there is no record in the GuildChunks map")

return result, false, nil
}

if guildChunk.Complete.Load() {
ctx.Logger.Warn().
Int64("guildID", int64(guildMembersChunkPayload.GuildID)).
Msg("GuildChunks entry is marked as complete, but we received a guild member chunk")
}

select {
case guildChunk.ChunkingChannel <- GuildChunkPartial{
ChunkIndex: guildMembersChunkPayload.ChunkIndex,
ChunkCount: guildMembersChunkPayload.ChunkCount,
Nonce: guildMembersChunkPayload.Nonce,
}:
default:
}

return result, false, nil
}

Expand Down
26 changes: 21 additions & 5 deletions internal/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,14 +584,30 @@ func (grpc *routeSandwichServer) FetchMutualGuilds(ctx context.Context, request
func (grpc *routeSandwichServer) RequestGuildChunk(ctx context.Context, request *pb.RequestGuildChunkRequest) (response *pb.BaseResponse, err error) {
onGRPCRequest()

response = &pb.BaseResponse{
Ok: false,
Error: "Not implemented",
grpc.sg.managersMu.RLock()
for _, manager := range grpc.sg.Managers {
manager.shardGroupsMu.RLock()
for _, shardGroup := range manager.ShardGroups {
shardGroup.shardsMu.RLock()
for _, shard := range shardGroup.Shards {
shard.guildsMu.RLock()
if _, ok := shard.Guilds[discord.Snowflake(request.GuildId)]; ok {
err = shard.ChunkGuild(discord.Snowflake(request.GuildId))
if err != nil {
response.Error = err.Error()
}
}
shard.guildsMu.RUnlock()
}
shardGroup.shardsMu.RUnlock()
}
manager.shardGroupsMu.RUnlock()
}
grpc.sg.managersMu.RUnlock()

// TODO
response.Ok = true

return response, status.Errorf(codes.Unimplemented, "method RequestGuildChunk not implemented")
return response, err
}

// SendWebsocketMessage manually sends a websocket message.
Expand Down
1 change: 0 additions & 1 deletion internal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ type ManagerConfiguration struct {
DefaultPresence discord.UpdateStatus `json:"default_presence" yaml:"default_presence"`
Intents int32 `json:"intents" yaml:"intents"`
ChunkGuildsOnStartup bool `json:"chunk_guilds_on_startup" yaml:"chunk_guilds_on_startup"`
// TODO: Guild chunking
} `json:"bot" yaml:"bot"`

Caching struct {
Expand Down
25 changes: 24 additions & 1 deletion internal/sandwich.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// VERSION follows semantic versioning.
const VERSION = "1.9.1"
const VERSION = "1.10"

const (
PermissionsDefault = 0o744
Expand Down Expand Up @@ -109,6 +109,9 @@ type Sandwich struct {
receivedPool sync.Pool
// SentPayload pool
sentPool sync.Pool

guildChunksMu sync.RWMutex
guildChunks map[discord.Snowflake]*GuildChunks
}

// SandwichConfiguration represents the configuration file.
Expand Down Expand Up @@ -157,6 +160,23 @@ type SandwichOptions struct {
HTTPEnabled bool `json:"http_enabled" yaml:"http_enabled"`
}

type GuildChunks struct {
// Indicates if all chunks have been received.
Complete atomic.Bool

// Channel for receiving when chunks have been received.
ChunkingChannel chan GuildChunkPartial

StartedAt atomic.Time
CompletedAt atomic.Time
}

type GuildChunkPartial struct {
ChunkIndex int32
ChunkCount int32
Nonce string
}

// NewSandwich creates the application state and initializes it.
func NewSandwich(logger io.Writer, options SandwichOptions) (sg *Sandwich, err error) {
sg = &Sandwich{
Expand Down Expand Up @@ -201,6 +221,9 @@ func NewSandwich(logger io.Writer, options SandwichOptions) (sg *Sandwich, err e
sentPool: sync.Pool{
New: func() interface{} { return new(discord.SentPayload) },
},

guildChunksMu: sync.RWMutex{},
guildChunks: make(map[discord.Snowflake]*GuildChunks),
}

sg.ctx, sg.cancel = context.WithCancel(context.Background())
Expand Down
116 changes: 110 additions & 6 deletions internal/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ const (

MessageChannelBuffer = 64

// Time necessary to abort chunking when no events have been received yet in this time frame.
InitialMemberChunkTimeout = 10 * time.Second
// Time necessary to mark chunking as completed when no more events are received in this time frame.
MemberChunkTimeout = 1 * time.Second
// Time between chunks no longer marked as chunked anymore.
ChunkStatePersistTimeout = 10 * time.Second

// Number of retries attempted before considering a shard not working.
ShardConnectRetries = 3
Expand Down Expand Up @@ -80,7 +76,7 @@ type Shard struct {
Heartbeater *time.Ticker `json:"-"`
HeartbeatInterval time.Duration `json:"-"`

// Duration since last heartbeat Ack beforereconnecting.
// Duration since last heartbeat Ack before reconnecting.
HeartbeatFailureInterval time.Duration `json:"-"`

// Map of guilds that are currently unavailable.
Expand All @@ -91,7 +87,7 @@ type Shard struct {
lazyMu sync.RWMutex
Lazy map[discord.Snowflake]bool `json:"lazy"`

// Stores a local list of all guilds in the shard.d
// Stores a local list of all guilds in the shard.
guildsMu sync.RWMutex
Guilds map[discord.Snowflake]bool `json:"guilds"`

Expand Down Expand Up @@ -892,6 +888,114 @@ func (sh *Shard) Reconnect(code websocket.StatusCode) error {
}
}

func (sh *Shard) ChunkAllGuilds() {
sh.guildsMu.RLock()

guilds := make([]discord.Snowflake, len(sh.Guilds))
i := 0

for guildID := range sh.Guilds {
guilds[i] = guildID
i++
}

sh.guildsMu.RUnlock()

sh.Logger.Info().Int("guilds", len(guilds)).Msg("Started chunking all guilds")

for _, guildID := range guilds {
sh.ChunkGuild(guildID)
}

sh.Logger.Info().Int("guilds", len(guilds)).Msg("Finished chunking all guilds")
}

// ChunkGuilds chunks guilds to discord. It will wait for the operation to complete, or timeout.
func (sh *Shard) ChunkGuild(guildID discord.Snowflake) error {
sh.Sandwich.guildChunksMu.RLock()
guildChunk, ok := sh.Sandwich.guildChunks[guildID]
sh.Sandwich.guildChunksMu.RUnlock()

if !ok {
guildChunk = &GuildChunks{
Complete: *atomic.NewBool(false),
ChunkingChannel: make(chan GuildChunkPartial),
StartedAt: *atomic.NewTime(time.Time{}),
CompletedAt: *atomic.NewTime(time.Time{}),
}

sh.Sandwich.guildChunksMu.Lock()
sh.Sandwich.guildChunks[guildID] = guildChunk
sh.Sandwich.guildChunksMu.Unlock()
}

guildChunk.Complete.Store(false)
guildChunk.StartedAt.Store(time.Now())

nonce := randomHex(16)

err := sh.SendEvent(sh.ctx, discord.GatewayOpRequestGuildMembers, discord.RequestGuildMembers{
GuildID: guildID,
Nonce: nonce,
})
if err != nil {
return fmt.Errorf("failed to send request guild members event: %w", err)
}

chunksReceived := int32(0)
totalChunks := int32(0)

timeout := time.NewTimer(MemberChunkTimeout)

guildChunkLoop:
for {
select {
case guildChunkPartial := <-guildChunk.ChunkingChannel:
if guildChunkPartial.Nonce != nonce {
continue
}

chunksReceived++
totalChunks = guildChunkPartial.ChunkCount

// When receiving a chunk, reset the timeout.
timeout.Reset(MemberChunkTimeout)

sh.Logger.Debug().
Int64("guild_id", int64(guildID)).
Int32("chunk_index", guildChunkPartial.ChunkIndex).
Int32("chunk_count", guildChunkPartial.ChunkCount).
Msg("Received guild member chunk")

if chunksReceived >= totalChunks {
sh.Logger.Debug().
Int64("guild_id", int64(guildID)).
Int32("total_chunks", totalChunks).
Msg("Received all guild member chunks")

break guildChunkLoop
}
case <-timeout.C:
// We have timed out. We will mark the chunking as complete.

sh.Logger.Warn().
Int64("guild_id", int64(guildID)).
Int32("chunks_received", chunksReceived).
Int32("total_chunks", totalChunks).
Msg("Timed out receiving guild member chunks")

break guildChunkLoop
}
}

timeout.Stop()

guildChunk.Complete.Store(true)
guildChunk.CompletedAt.Store(time.Now())

return nil
}

// OnDispatchEvent is called during the dispatch event to call analytics.
func (sh *Shard) OnDispatchEvent(eventType string) {
sh.OnGuildDispatchEvent(eventType, discord.Snowflake(0))
Expand Down
24 changes: 0 additions & 24 deletions internal/shardgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,6 @@ type ShardGroup struct {
statusMu sync.RWMutex `json:"-"`
Status sandwich_structs.ShardGroupStatus `json:"status"`

// MemberChunksCallback is used to signal when a guild is chunking.
memberChunksCallbackMu sync.RWMutex `json:"-"`
MemberChunksCallback map[discord.Snowflake]*sync.WaitGroup `json:"-"`

// MemberChunksComplete is used to signal if a guild has recently
// been chunked. It is up to the guild task to remove this bool
// a few seconds after finishing chunking.
memberChunksCompleteMu sync.RWMutex `json:"-"`
MemberChunksComplete map[discord.Snowflake]*atomic.Bool `json:"-"`

// MemberChunkCallbacks is used to signal when any MEMBER_CHUNK
// events are received for the specific guild.
memberChunkCallbacksMu sync.RWMutex `json:"-"`
MemberChunkCallbacks map[discord.Snowflake]chan bool `json:"-"`

// Used to override when events can be processed.
// Used to orchestrate scaling of shardgroups.
floodgateMu sync.RWMutex
Expand Down Expand Up @@ -91,15 +76,6 @@ func (mg *Manager) NewShardGroup(shardGroupID int32, shardIDs []int32, shardCoun
statusMu: sync.RWMutex{},
Status: sandwich_structs.ShardGroupStatusIdle,

memberChunksCallbackMu: sync.RWMutex{},
MemberChunksCallback: make(map[discord.Snowflake]*sync.WaitGroup),

memberChunksCompleteMu: sync.RWMutex{},
MemberChunksComplete: make(map[discord.Snowflake]*atomic.Bool),

memberChunkCallbacksMu: sync.RWMutex{},
MemberChunkCallbacks: make(map[discord.Snowflake]chan bool),

floodgate: false,
floodgateMu: sync.RWMutex{},
}
Expand Down

0 comments on commit 6d7e9ae

Please sign in to comment.