This repository has been archived by the owner on Aug 14, 2020. It is now read-only.
/
runner.go
131 lines (111 loc) · 2.44 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
*
* In The Name of God
*
* +===============================================
* | Author: Parham Alvani <parham.alvani@gmail.com>
* |
* | Creation Date: 22-07-2018
* |
* | File Name: runner.go
* +===============================================
*/
package core
import (
"fmt"
"log"
"net/url"
"sync"
"time"
"github.com/I1820/lg/generators"
)
// Pick is called in order to pickup a data for generate method
type Pick func() interface{}
// Transport transports data to given topic based its network protocol
type Transport interface {
Init(url string, token string) error
Transmit(topic string, data []byte) error
}
// RunnerConfig contains runner configuration
// these configuration specifies host, rate and etc.
type RunnerConfig struct {
Generator generators.Generator
Duration time.Duration
Pick Pick
Token string
URL string
}
// Runner runs given generator in specific intervals
type Runner struct {
generator generators.Generator
duration time.Duration
counter int64
pick Pick
transport Transport
lck sync.RWMutex
stop chan struct{}
}
// NewRunner creates new runner
func NewRunner(config RunnerConfig) (Runner, error) {
// Find and configure the transport
var t Transport
url, err := url.Parse(config.URL)
if err != nil {
return Runner{}, err
}
switch url.Scheme {
case "http", "https":
t = &HTTPTransport{}
case "mqtt":
t = &MQTTTransport{}
default:
return Runner{}, fmt.Errorf("Scheme %s is not supported yet", url.Scheme)
}
if err := t.Init(url.Host, config.Token); err != nil {
return Runner{}, err
}
return Runner{
generator: config.Generator,
duration: config.Duration,
counter: 0,
pick: config.Pick,
transport: t,
stop: make(chan struct{}),
}, nil
}
// Count returns number of generated messages
func (r *Runner) Count() int64 {
r.lck.RLock()
defer r.lck.RUnlock()
return r.counter
}
// Stop stops the runner
func (r *Runner) Stop() {
close(r.stop)
}
// Run runs the runner :joy:
func (r *Runner) Run() {
sendTick := time.Tick(r.duration)
go func() {
for {
select {
case <-sendTick:
message, err := r.generator.Generate(r.pick())
if err != nil {
log.Printf("Generator Generate: %s", err)
}
if err := r.transport.Transmit(
r.generator.Topic(),
message,
); err != nil {
log.Printf("Transmit: %s", err)
}
r.lck.Lock()
r.counter++
r.lck.Unlock()
case <-r.stop:
return
}
}
}()
}