-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.go
158 lines (141 loc) · 4.28 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package example
import (
"context"
"errors"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"github.com/visforest/windy/core"
)
type Email struct {
Receivers []string
Receiver string
Subject string
Content string
}
var Emails = []Email{
{
Receiver: "wind@example.com",
Subject: "Award!",
Content: "You got a $15 award.",
},
{
Receiver: "rain@example.com",
Subject: "Award!",
Content: "You got a $15 award.",
},
{
Receiver: "wind@example.com",
Subject: "Award!",
Content: "You got a $20 award.",
},
{
Receivers: []string{"snow@example.com", "cloud@example.com"},
Subject: "Join your 2024 contest",
Content: "Making new friends? Looking for a new job? Join the 2024 algorithm contest with over 200 coders!",
},
{
Receiver: "cloud@example.com",
Subject: "New message from your friend",
Content: "Your friend Susan sent a message minutes ago.",
},
{
Receiver: "storm@example.com",
Subject: "%70 discount!",
Content: "Upgrade to VIP, and you're going to have 70% discount every Friday!",
},
}
type MyProduceListener struct{}
func (l *MyProduceListener) PrepareSend(ctx context.Context, topic string, msg *core.Msg, err error) {
if err != nil {
fmt.Printf("get err before msg is sent:%s \n", err.Error())
return
}
// channel := ctx.Value("channel").(string)
// fmt.Printf("prepare to send msg %s to %s,channel: %s\n", msg.Id, topic, channel)
}
func (l *MyProduceListener) OnSendSucceed(ctx context.Context, topic string, msg *core.Msg) {
// channel := ctx.Value("channel").(string)
// fmt.Printf("sent msg %s to %s,channel: %s \n", msg.Id, topic, channel)
}
func (l *MyProduceListener) OnSendFail(ctx context.Context, topic string, msg *core.Msg, err error) {
// channel := ctx.Value("channel").(string)
// fmt.Printf("failed to send msg %s to %s, %s,channel: %s \n", msg.Id, topic, err.Error(), channel)
}
type MyConsumerListener struct{}
func (l *MyConsumerListener) PrepareConsume(ctx context.Context, topic string, msg *core.Msg, err error) {
if err != nil {
fmt.Printf("get err before consume:%s \n", err.Error())
return
}
ip := ctx.Value("myip").(string)
fmt.Printf("consuming msg %s from %s,ip: %s\n", msg.Id, topic, ip)
}
func (l *MyConsumerListener) OnConsumeSucceed(ctx context.Context, topic string, msg *core.Msg) {
ip := ctx.Value("myip").(string)
fmt.Printf("consume msg %s from %s,ip: %s \n", msg.Id, topic, ip)
}
func (l *MyConsumerListener) OnConsumeFail(ctx context.Context, topic string, msg *core.Msg, err error) {
ip := ctx.Value("myip").(string)
fmt.Printf("failed to consume msg %s from %s, %s,ip: %s \n", msg.Id, topic, err.Error(), ip)
}
// MyIdCreator is a customized id creator
type MyIdCreator struct{}
func (c *MyIdCreator) Create() string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, 10)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return string(b)
}
func Print(ctx context.Context, topic string, msg *core.Msg) error {
var l sync.Mutex
defer l.Unlock()
l.Lock()
var num int
if err := core.ParseFromMsg(msg, &num); err != nil {
panic(err)
// return err
}
var s = fmt.Sprintf("%s,got %d\n", topic, num)
fmt.Print(s)
f, err := os.OpenFile("test.txt", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
// return err
}
defer f.Close()
_, err = f.WriteString(s)
return err
}
func SendEmail(ctx context.Context, topic string, msg *core.Msg) error {
ip, ok := ctx.Value("myip").(string)
if ok {
fmt.Printf("start to send email from ip: %s,topic is %s \n", ip, topic)
} else {
fmt.Printf("start to send email,topic is %s \n", topic)
}
var email Email
if err := core.ParseFromMsg(msg, &email); err == nil {
fmt.Printf("send to %s:%s \n", email.Receiver, email.Content)
return nil
}
return errors.New("got bad data from queue")
}
func BatchSendEmail(ctx context.Context, topic string, msg *core.Msg) error {
ip, ok := ctx.Value("myip").(string)
if ok {
fmt.Printf("start to send email from ip: %s,topic is %s \n", ip, topic)
} else {
fmt.Printf("start to send email,topic is %s \n", topic)
}
var email Email
if err := core.ParseFromMsg(msg, &email); err == nil {
fmt.Printf("send to %s:%s \n", strings.Join(email.Receivers, ";"), email.Content)
return nil
}
return errors.New("got bad data from queue")
}