-
Notifications
You must be signed in to change notification settings - Fork 0
/
http.go
151 lines (128 loc) · 3.37 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
Package http provider allows for jobs to be manualy inserted into the manager though an http api.
This provider can be enabled though the config file, just as any other provider. However, the manager will not pull this provider for jobs.
*/
package http
import (
"errors"
"fmt"
"io/ioutil"
"log"
std_http "net/http"
"time"
"github.com/barracudanetworks/GoWorker/job"
"github.com/barracudanetworks/GoWorker/provider"
)
const (
DEFAULT_ENDPOINT = "/job/add"
)
var (
STATUS_TO_HTTP_CODE = map[job.Status]uint{
job.STATUS_SUCCESS: std_http.StatusOK,
job.STATUS_FAILURE: std_http.StatusInternalServerError,
}
NOT_HTTP_JOB = errors.New("This job is not of the correct type. Expecting type *HttpJob")
)
func init() {
// load http provider
if provider.Factories == nil {
log.Println("unable to load provider")
} else {
provider.Factories["http"] = HttpFactory
}
}
// HttpConfig provides config information for the Http provider
type HttpConfig struct {
ListOn string `json:"listen_on"`
Endpoint string `json:"endpoint"`
}
// Http is a provider that allows for manual insertion of jobs
type Http struct {
jobChan chan job.Job
endPoint string
listen_on string
server *std_http.ServeMux
}
func (h *Http) ConfigStruct() interface{} {
return &HttpConfig{}
}
// Init init a http provider and start it's webserver
func (h *Http) Init(i interface{}) error {
conf, ok := i.(*HttpConfig)
if !ok {
return provider.WRONG_CONFIG_TYPE
}
h.endPoint = conf.Endpoint
h.server = std_http.NewServeMux()
h.server.HandleFunc(conf.Endpoint, h.addNewJob)
h.listen_on = conf.ListOn
// handle the function
go func() {
log.Fatal(std_http.ListenAndServe(conf.ListOn, h.server))
}()
return nil
}
// ConfirmJob
func (h *Http) ConfirmJob(j job.Job) error {
// assume that all jobs coming into this confirmer are of type HttpJob
hj, ok := j.(*HttpJob)
if !ok {
return NOT_HTTP_JOB
}
// make sure the http response is taken care of
<-hj.confirmChan
return nil
}
// RequestWork register a new handler function. Then hold the provider open for ever.
func (h *Http) RequestWork(n int, c chan job.Job) error {
h.jobChan = c
<-make(chan struct{})
return nil
}
// WaitTime
func (h *Http) WaitTime(t float64) time.Duration {
return 0 * time.Second
}
// Close
func (h *Http) Close() error {
return nil
}
// Target return 0.0 as this provider can not be limited
func (h *Http) Target() float64 {
return 0
}
// Name return the name of the provider
func (h *Http) Name() string {
return "http_" + h.listen_on + h.endPoint
}
// addNewJob an http.Handlerfunc that parses a job from the body of a request, and sends it on the jobChan
func (h *Http) addNewJob(rw std_http.ResponseWriter, req *std_http.Request) {
b, err := ioutil.ReadAll(req.Body)
if err != nil {
fmt.Fprint(rw, err)
}
var jc *job.JobConfig
jc, err = job.ParseConfig(b)
if err != nil {
log.Println(err, b)
fmt.Fprint(rw, err)
return
}
// make the job able to write back to the requester
jc.OutputWriter = rw
// create a new HttpJob and send it to the manager
j := &HttpJob{
config: jc,
provider: h,
responseWriter: rw,
request: req,
confirmChan: make(chan struct{}),
}
h.jobChan <- j
// hold this request open until the response is ready to be written
j.confirmChan <- struct{}{}
}
// HttpFactory factory for new http request
func HttpFactory() provider.Provider {
return &Http{}
}