This repository has been archived by the owner on Jan 9, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
container.go
87 lines (75 loc) · 2.18 KB
/
container.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright 2016 Arsham Shirvani <arshamshirvani@gmail.com>. All rights reserved.
// Use of this source code is governed by the Apache 2.0 license
// License that can be found in the LICENSE file.
package datatype
import (
"bytes"
"fmt"
"io"
"sync"
"time"
"github.com/antonholmquist/jason"
"github.com/pkg/errors"
)
// TimeStampFormat specifies the format that all timestamps are formatted with.
var TimeStampFormat = "2006-01-02T15:04:05.999999-07:00"
// Container satisfies the DataContainer
type Container struct {
sync.RWMutex
genMu sync.Mutex
list []DataType
}
// New returns a new container and populates it with the given list.
func New(list []DataType) *Container {
return &Container{list: list}
}
// List returns the data.
func (c *Container) List() []DataType {
c.RLock()
defer c.RUnlock()
return c.list
}
// Len returns the length of the data.
func (c *Container) Len() int {
c.RLock()
defer c.RUnlock()
return len(c.list)
}
// Add adds d to the list. You can pass it as many items you need to.
func (c *Container) Add(d ...DataType) {
c.Lock()
c.list = append(c.list, d...)
c.Unlock()
}
// Generate prepends a timestamp pair and value to the list, and generates
// a json object suitable for recording into a document store.
func (c *Container) Generate(p io.Writer, timestamp time.Time) (int, error) {
// c.genMu.Lock()
// defer c.genMu.Unlock()
ts := fmt.Sprintf(`"@timestamp":"%s"`, timestamp.Format(TimeStampFormat))
l := new(bytes.Buffer)
for _, v := range c.List() {
l.Write([]byte(","))
_, err := l.ReadFrom(v)
if err != nil {
return 0, errors.Wrap(err, "writing item")
}
}
ls := l.Bytes()
return p.Write([]byte(fmt.Sprintf("{%s%s}", ts, ls)))
}
// JobResultDataTypes generates a list of DataType and puts them inside the
// DataContainer. It returns errors if unmarshaling is unsuccessful or
// ErrUnidentifiedJason when the container ends up empty.
func JobResultDataTypes(b []byte, mapper Mapper) (DataContainer, error) {
obj, err := jason.NewObjectFromBytes(b)
if err != nil {
return nil, err
}
payload := mapper.Values("", obj.Map())
if len(payload) == 0 {
expUnidentifiedJSON.Add(1)
return nil, ErrUnidentifiedJason
}
return New(payload), nil
}