-
Notifications
You must be signed in to change notification settings - Fork 3
/
messaging.go
76 lines (63 loc) · 1.82 KB
/
messaging.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package internal
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/WelcomerTeam/Discord/discord"
messaging "github.com/WelcomerTeam/Sandwich-Daemon/messaging"
sandwich_structs "github.com/WelcomerTeam/Sandwich-Daemon/structs"
)
type MQClient interface {
String() string
Channel() string
Connect(ctx context.Context, clientName string, args map[string]interface{}) error
Publish(ctx context.Context, channel string, data []byte) error
// Function to clean close
}
func NewMQClient(mqType string) (MQClient, error) {
switch mqType {
case "jetstream":
return &messaging.JetStreamMQClient{}, nil
case "kafka":
return &messaging.KafkaMQClient{}, nil
case "redis":
return &messaging.RedisMQClient{}, nil
default:
return nil, fmt.Errorf("%s is not a valid MQClient", mqType)
}
}
// PublishEvent publishes a SandwichPayload.
func (sh *Shard) PublishEvent(ctx context.Context, packet *sandwich_structs.SandwichPayload) error {
sh.Manager.configurationMu.RLock()
identifier := sh.Manager.Configuration.ProducerIdentifier
channelName := sh.Manager.Configuration.Messaging.ChannelName
application := sh.Manager.Identifier.Load()
sh.Manager.configurationMu.RUnlock()
userID := sh.Manager.UserID.Load()
packet.Metadata = sandwich_structs.SandwichMetadata{
Version: VERSION,
Identifier: identifier,
Application: application,
ApplicationID: discord.Snowflake(userID),
Shard: [3]int32{
sh.ShardGroup.ID,
sh.ShardID,
sh.ShardGroup.ShardCount,
},
}
packet.Trace["publish"] = discord.Int64(time.Now().Unix())
payload, err := json.Marshal(packet)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
err = sh.Manager.ProducerClient.Publish(
ctx,
channelName,
payload,
)
if err != nil {
return fmt.Errorf("publishEvent publish: %w", err)
}
return nil
}