forked from haxpax/gosms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
124 lines (110 loc) · 3.66 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package gosms
import (
"log"
"time"
)
const (
SMSPending = iota // 0
SMSProcessed // 1
SMSError // 2
)
type SMS struct {
UUID string `json:"uuid"`
Mobile string `json:"mobile"`
Body string `json:"body"`
Status int `json:"status"`
Retries int `json:"retries"`
Device string `json:"device"`
}
var messages chan SMS
var wakeupMessageLoader chan bool
var bufferMaxSize int
var bufferLowCount int
var messageCountSinceLastWakeup int
var timeOfLastWakeup time.Time
var messageLoaderTimeout time.Duration
var messageLoaderCountout int
var messageLoaderLongTimeout time.Duration
func InitWorker(modems []*GSMModem, bufferSize, bufferLow, loaderTimeout, countOut, loaderLongTimeout int) {
log.Println("--- InitWorker")
bufferMaxSize = bufferSize
bufferLowCount = bufferLow
messageLoaderTimeout = time.Duration(loaderTimeout) * time.Minute
messageLoaderCountout = countOut
messageLoaderLongTimeout = time.Duration(loaderLongTimeout) * time.Minute
messages = make(chan SMS, bufferMaxSize)
wakeupMessageLoader = make(chan bool, 1)
wakeupMessageLoader <- true
messageCountSinceLastWakeup = 0
timeOfLastWakeup = time.Now().Add((time.Duration(loaderTimeout) * -1) * time.Minute) //older time handles the cold start state of the system
// its important to init messages channel before starting modems because nil
// channel is non-blocking
for i := 0; i < len(modems); i++ {
modem := modems[i]
err := modem.Connect()
if err != nil {
log.Println("InitWorker: error connecting", modem.Devid, err)
continue
}
go modem.ProcessMessages()
}
go messageLoader(bufferMaxSize, bufferLowCount)
}
func EnqueueMessage(message *SMS, insertToDB bool) {
log.Println("--- EnqueueMessage: ", message)
if insertToDB {
insertMessage(message)
}
//wakeup message loader and exit
go func() {
//notify the message loader only if its been to too long
//or too many messages since last notification
messageCountSinceLastWakeup++
if messageCountSinceLastWakeup > messageLoaderCountout || time.Now().Sub(timeOfLastWakeup) > messageLoaderTimeout {
log.Println("EnqueueMessage: ", "waking up message loader")
wakeupMessageLoader <- true
messageCountSinceLastWakeup = 0
timeOfLastWakeup = time.Now()
}
log.Println("EnqueueMessage - anon: count since last wakeup: ", messageCountSinceLastWakeup)
}()
}
func messageLoader(bufferSize, minFill int) {
// Load pending messages from database as needed
for {
/*
- set a fairly long timeout for wakeup
- if there are very few number of messages in the system and they failed at first go,
and there are no events happening to call EnqueueMessage, those messages might get
stalled in the system until someone knocks on the API door
- we can afford a really long polling in this case
*/
timeout := make(chan bool, 1)
go func() {
time.Sleep(messageLoaderLongTimeout)
timeout <- true
}()
log.Println("messageLoader: ", "waiting for wakeup call")
select {
case <-wakeupMessageLoader:
log.Println("messageLoader: woken up by channel call")
case <-timeout:
log.Println("messageLoader: woken up by timeout")
}
if len(messages) >= bufferLowCount {
//if we have sufficient number of messages to process,
//don't bother hitting the database
log.Println("messageLoader: ", "I have sufficient messages")
continue
}
countToFetch := bufferMaxSize - len(messages)
log.Println("messageLoader: ", "I need to fetch more messages", countToFetch)
pendingMsgs, err := getPendingMessages(countToFetch)
if err == nil {
log.Println("messageLoader: ", len(pendingMsgs), " pending messages found")
for _, msg := range pendingMsgs {
messages <- msg
}
}
}
}