forked from openimsdk/open-im-server
/
init.go
85 lines (79 loc) · 2.88 KB
/
init.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package logic
import (
"fmt"
"sync"
"github.com/erbaner/be/pkg/common/config"
"github.com/erbaner/be/pkg/common/constant"
"github.com/erbaner/be/pkg/common/kafka"
promePkg "github.com/erbaner/be/pkg/common/prometheus"
"github.com/erbaner/be/pkg/statistics"
)
const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0
const Msg = 2
const ConsumerMsgs = 3
const AggregationMessages = 4
const MongoMessages = 5
const ChannelNum = 100
var (
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryRedisConsumerHandler
historyMongoCH OnlineHistoryMongoConsumerHandler
modifyCH ModifyMsgConsumerHandler
producer *kafka.Producer
producerToModify *kafka.Producer
producerToMongo *kafka.Producer
cmdCh chan Cmd2Value
onlineTopicStatus int
w *sync.Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
singleMsgSuccessCountMutex sync.Mutex
)
func Init() {
cmdCh = make(chan Cmd2Value, 10000)
w = new(sync.Mutex)
if config.Config.Prometheus.Enable {
initPrometheus()
}
persistentCH.Init() // ws2mschat save mysql
historyCH.Init(cmdCh) //
historyMongoCH.Init()
modifyCH.Init()
onlineTopicStatus = OnlineTopicVacancy
//offlineHistoryCH.Init(cmdCh)
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic)
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
}
func Run(promethuesPort int) {
//register mysqlConsumerHandler to
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
go modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(&modifyCH)
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
func SetOnlineTopicStatus(status int) {
w.Lock()
defer w.Unlock()
onlineTopicStatus = status
}
func GetOnlineTopicStatus() int {
w.Lock()
defer w.Unlock()
return onlineTopicStatus
}