-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.go
82 lines (65 loc) · 1.77 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package master
import (
"io"
"net/http"
"strconv"
"sync"
"github.com/benmizrahi/gobig/internal/common"
"github.com/benmizrahi/gobig/internal/protos"
"github.com/gin-gonic/gin"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
ginlogrus "github.com/toorop/gin-logrus"
)
// singeltone instance of master!
var lock = &sync.Mutex{}
// Singel instance
var masterInstance *Master
type Master struct {
MasterPath string
context *Context
Http *gin.Engine
}
func NewMaster(isLocal bool, host string, port int, minWorkers int) *Master {
if masterInstance == nil {
lock.Lock()
log.Info("gobig Master, Creating new master instance")
m := &Master{
MasterPath: host + ":" + strconv.Itoa(port),
Http: gin.New(),
}
m.Http.Use(ginlogrus.Logger(logrus.New()), gin.Recovery())
m.Http.POST("/api/register", m.RegisterHandler)
go m.Http.Run(m.MasterPath)
log.Info("gobig Master, master is listening on ", m.MasterPath)
m.context = NewContext(isLocal, minWorkers, m.MasterPath)
m.context.InitContext()
lock.Unlock()
return m
}
return masterInstance
}
func (m *Master) RegisterHandler(c *gin.Context) {
buf, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Fatalln("Failed to parse register request:", err)
}
req := &protos.RegisterReq{}
if err := proto.Unmarshal(buf, req); err != nil {
log.Fatalln("Failed to parse register request:", err)
}
m.context.Workers[req.Uuid] = req.Http
data := &protos.RegisterRes{
Ok: true,
}
c.ProtoBuf(http.StatusOK, data)
}
func (m *Master) Parallelize(data [][]string, option common.Options) *Mafream {
mf := NewDataFrame(m.context, []string{}, 1)
return mf
}
func (m *Master) Load() *Mafream {
mf := NewDataFrame(m.context, []string{}, 1)
return mf
}