This repository has been archived by the owner on Sep 15, 2021. It is now read-only.
/
message_to_curl.go
138 lines (114 loc) · 4.13 KB
/
message_to_curl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/samuel/go-zookeeper/zk"
"github.com/wvanbergen/kazoo-go"
)
var (
zkAddr string
topic string
partition int
offset int64
serializer string
contentType string
callback string
)
type MessageBody struct {
Topic string `json:"Topic"`
PartitionKey string `json:"PartitionKey"`
TimeStamp int64 `json:"TimeStamp"`
Data string `json:"Data"`
LogId string `json:"LogId"`
ContentType string `json:"ContentType"`
}
func init() {
flag.StringVar(&zkAddr, "zk", "127.0.0.1:2181", "zookeeper address which kafka belongs to")
flag.StringVar(&topic, "topic", "", "topic name")
flag.IntVar(&partition, "partition", -1, "partition id")
flag.Int64Var(&offset, "offset", -1, "offset number")
flag.StringVar(&serializer, "serializer", "raw", "message serializer name")
flag.StringVar(&contentType, "type", "", "message content type")
flag.StringVar(&callback, "callback", "http://url_to_replace", "callback url to build")
zk.DefaultLogger = log.New(ioutil.Discard, "[Zookeeper] ", log.LstdFlags)
}
func escapeShellArg(arg string) string {
return "'" + strings.Replace(arg, "'", "'\\''", -1) + "'"
}
func main() {
flag.Parse()
if topic == "" {
panic("topic should not be empty")
}
if partition < 0 {
panic("partition should be specified")
}
if offset < 0 {
panic("offset should be specified")
}
kazooConf := kazoo.NewConfig()
kazooInstance, err := kazoo.NewKazooFromConnectionString(zkAddr, kazooConf)
if err != nil {
panic(fmt.Sprintf("connect zookeeper failed: %v", err))
}
defer kazooInstance.Close()
brokers, err := kazooInstance.BrokerList()
if err != nil {
panic(fmt.Sprintf("read broker list from zookeeper failed: %v", err))
}
saramaConfig := sarama.NewConfig()
saramaConsumer, err := sarama.NewConsumer(brokers, saramaConfig)
if err != nil {
panic(fmt.Sprintf("connect to kafka failed: %v", err))
}
defer saramaConsumer.Close()
partitionConsumer, err := saramaConsumer.ConsumePartition(topic, int32(partition), offset)
if err != nil {
panic(fmt.Sprintf("consume message failed, could not get partition consumer: %v", err))
}
defer partitionConsumer.Close()
var message *sarama.ConsumerMessage
select {
case message = <-partitionConsumer.Messages():
case <-time.After(10 * time.Second):
panic("message not received after 10 seconds")
}
var messageBody MessageBody
// got message
switch serializer {
case "raw":
messageBody.Data = string(message.Value)
case "json":
fallthrough
default:
json.Unmarshal(message.Value, &messageBody)
}
if contentType != "" {
messageBody.ContentType = contentType
} else if messageBody.ContentType == "" {
messageBody.ContentType = "application/x-www-form-urlencoded"
}
// build curl command
var curlCommand []string
curlCommand = append(curlCommand, "curl", escapeShellArg(callback))
// append headers
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("Content-Type: %s", messageBody.ContentType)))
curlCommand = append(curlCommand, "-H", escapeShellArg("User-Agent: Taiji pusher consumer(go)/v2.0.0-simulator"))
curlCommand = append(curlCommand, "-H", escapeShellArg("X-Retry-Times: 0"))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Topic: %s", message.Topic)))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Partition: %d", message.Partition)))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Partition-Key: %s", messageBody.PartitionKey)))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Offset: %d", message.Offset)))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Logid: %s", messageBody.LogId)))
curlCommand = append(curlCommand, "-H", escapeShellArg(fmt.Sprintf("X-Kmq-Timestamp: %d", messageBody.TimeStamp)))
curlCommand = append(curlCommand, "-H", escapeShellArg("Meilishuo uid:0;ip:0.0.0.0;v:0;master:0"))
// append body
curlCommand = append(curlCommand, "--data", escapeShellArg(messageBody.Data))
fmt.Println(strings.Join(curlCommand, " "))
}