forked from tryfix/kstream
-
Notifications
You must be signed in to change notification settings - Fork 7
/
main.go
124 lines (99 loc) · 2.77 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"context"
"flag"
"fmt"
"github.com/gmbyapa/kstream/v2/kafka"
"github.com/gmbyapa/kstream/v2/kafka/adaptors/librd"
"github.com/gmbyapa/kstream/v2/streams"
"github.com/gmbyapa/kstream/v2/streams/encoding"
"github.com/tryfix/log"
"os"
"os/signal"
"strings"
"time"
)
var bootstrapServers = flag.String(`bootstrap-servers`, `192.168.0.101:9092`,
`A comma seperated list Kafka Bootstrap Servers`)
const TopicNumbers = `numbers`
func main() {
flag.Parse()
config := streams.NewStreamBuilderConfig()
config.BootstrapServers = strings.Split(*bootstrapServers, `,`)
config.ApplicationId = `kstream-branching`
config.Consumer.Offsets.Initial = kafka.OffsetEarliest
seed(config.Logger)
builder := streams.NewStreamBuilder(config)
buildTopology(builder)
topology, err := builder.Build()
if err != nil {
panic(err)
}
println("Topology - \n", topology.Describe())
runner := builder.NewRunner()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
go func() {
<-sigs
if err := runner.Stop(); err != nil {
println(err)
}
}()
if err := runner.Run(topology); err != nil {
panic(err)
}
}
func buildTopology(builder *streams.StreamBuilder) {
stream := builder.KStream(TopicNumbers, encoding.NoopEncoder{}, encoding.IntEncoder{})
splitted := stream.Split()
splitted.New(`odd`, func(ctx context.Context, key interface{}, val interface{}) (bool, error) {
return val.(int)%2 != 0, nil
})
splitted.New(`even`, func(ctx context.Context, key interface{}, val interface{}) (bool, error) {
return val.(int)%2 == 0, nil
})
splitted.Branch(`odd`).Each(func(ctx context.Context, key, value interface{}) {
println(`Odd number:`, value.(int))
})
splitted.Branch(`even`).Each(func(ctx context.Context, key, value interface{}) {
println(`Even number:`, value.(int))
})
}
func seed(logger log.Logger) {
conf := librd.NewProducerConfig()
conf.BootstrapServers = strings.Split(*bootstrapServers, `,`)
conf.Transactional.Enabled = true
conf.Transactional.Id = `words-producer`
producer, err := librd.NewProducer(conf)
if err != nil {
panic(err)
}
txProducer := producer.(kafka.TransactionalProducer)
if err := txProducer.InitTransactions(context.Background()); err != nil {
panic(err)
}
if err := txProducer.BeginTransaction(); err != nil {
panic(err)
}
for i := 1; i <= 100; i++ {
record := producer.NewRecord(
context.Background(),
nil,
[]byte(fmt.Sprint(i)),
TopicNumbers,
kafka.PartitionAny,
time.Now(),
nil,
``,
)
err := txProducer.ProduceAsync(context.Background(), record)
if err != nil {
panic(err)
}
logger.Debug(`message produced to`, TopicNumbers)
}
if err := txProducer.CommitTransaction(context.Background()); err != nil {
panic(err)
}
logger.Info(`Test records produced`)
}