Permalink
Browse files

LoggerをBulk Requestするように修正

  • Loading branch information...
nomeaning777 committed Oct 20, 2018
1 parent ae67974 commit 4c4c3400bcd95d1d42a42f70e76734c1c70b1b93
@@ -99,7 +99,8 @@ var banList map[string]int
var banMutex *sync.Mutex

func checkBan(bankID string) bool {

//return false
//
if banList == nil {
banList = make(map[string]int)
banMutex = &sync.Mutex{}
@@ -110,11 +111,13 @@ func checkBan(bankID string) bool {
}

func incrementBan(bankID string) {
//return
banMutex.Lock()
defer banMutex.Unlock()
banList[bankID]++
}
func resetBan(bankID string) {
//return
banMutex.Lock()
defer banMutex.Unlock()
banList[bankID] = 0
@@ -252,7 +255,7 @@ func (h *Handler) Info(w http.ResponseWriter, r *http.Request, _ httprouter.Para
res["highest_buy_price"] = highestBuyOrder.Price
}
// TODO: trueにするとシェアボタンが有効になるが、アクセスが増えてヤバイので一旦falseにしておく
res["enable_share"] = false
res["enable_share"] = true
h.handleSuccess(w, res)
}

@@ -69,7 +69,14 @@ func Logger(d QueryExecutor) (*isulogger.Isulogger, error) {
if err != nil {
return nil, errors.Wrapf(err, "getSetting failed. %s", LogAppid)
}
return isulogger.NewIsulogger(ep, id)
if isulogger.Logger == nil || isulogger.Logger.AppId != id || isulogger.Logger.EndPoint != ep {
// TODO: メモリリークを修正する
isulogger.Logger, err = isulogger.NewIsulogger(ep, id)
if err != nil {
return nil, err
}
}
return isulogger.Logger, nil
}

func sendLog(d QueryExecutor, tag string, v interface{}) {
@@ -25,8 +25,13 @@ type Log struct {
type Isulogger struct {
endpoint *url.URL
appID string
AppId string
EndPoint string
logChan chan *Log
}

var Logger *Isulogger

// NewIsulogger はIsuloggerを初期化します
//
// endpoint: ISULOGを利用するためのエンドポイントURI
@@ -36,19 +41,25 @@ func NewIsulogger(endpoint, appID string) (*Isulogger, error) {
if err != nil {
return nil, err
}
return &Isulogger{
logger := &Isulogger{
endpoint: u,
appID: appID,
}, nil
AppId: appID,
EndPoint: endpoint,
logChan: make(chan *Log, 10000),
}
go logger.process(logger.logChan)
return logger, nil
}

// Send はログを送信します
func (b *Isulogger) Send(tag string, data interface{}) error {
return b.request("/send", Log{
b.logChan <- &Log{
Tag: tag,
Time: time.Now(),
Data: data,
})
}
return nil
}

func (b *Isulogger) request(p string, v interface{}) error {
@@ -82,3 +93,77 @@ func (b *Isulogger) request(p string, v interface{}) error {
}
return fmt.Errorf("logger status is not ok. code: %d, body: %s", res.StatusCode, string(bo))
}

func (b *Isulogger) requestBluk(p *bytes.Buffer) error {
u := new(url.URL)
*u = *b.endpoint
u.Path = path.Join(u.Path, "send_bulk")

req, err := http.NewRequest(http.MethodPost, u.String(), p)
if err != nil {
return fmt.Errorf("logger new request failed. err: %s", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+b.appID)

res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("logger request failed. err: %s", err)
}
defer res.Body.Close()
bo, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("logger body read failed. err: %s", err)
}
if res.StatusCode == http.StatusOK {
return nil
}
return fmt.Errorf("logger status is not ok. code: %d, body: %s", res.StatusCode, string(bo))
}

func (b *Isulogger) process(logChan chan *Log) {
t := time.NewTicker(5 * time.Second)
r := bytes.NewBuffer(make([]byte, 0))
r.WriteString("[")
first := true
for {
select {
case <-t.C:
// FlushBuffer
if first {
continue
}
r.WriteString("]")
if err := b.requestBluk(r); err != nil {
fmt.Println(err)
}

first = true
r = bytes.NewBuffer(make([]byte, 0))
r.WriteString("[")
case log := <-logChan:
// logの追加
j, err := json.Marshal(log)
if err != nil {
fmt.Errorf("logger: Failed to marshal json\n")
continue
}
if r.Len()+len(j) >= 10*100*1000-1 {
// 9KBに到達したらFlushする
r.WriteString("]")
if err := b.requestBluk(r); err != nil {
fmt.Println(err)
}

first = true
r = bytes.NewBuffer(make([]byte, 1024*1024))
r.WriteString("[")
}
if !first {
r.WriteString(",")
}
r.Write(j)
first = false
}
}
}

0 comments on commit 4c4c340

Please sign in to comment.