Skip to content

Commit

Permalink
Add support to timestamp stage to parse Unix seconds, milliseconds, a…
Browse files Browse the repository at this point in the history
…nd nanosecond timestamps
  • Loading branch information
slim-bean committed Jul 10, 2019
1 parent 1cc25f8 commit de83272
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 33 deletions.
14 changes: 13 additions & 1 deletion docs/logentry/processing-log-lines.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ RFC3339 = "2006-01-02T15:04:05Z07:00"
RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
```

Additionally support for common Unix timestamps is supported:

```go
Unix = 1562708916
UnixMs = 1562708916414
UnixNs = 1562708916000000123
```

Finally any custom format can be supplied, and will be passed directly in as the layout parameter in time.Parse()

__Read the [time.parse](https://golang.org/pkg/time/#Parse) docs closely if passing a custom format and make sure your custom format uses the special date they specify: `Mon Jan 2 15:04:05 -0700 MST 2006`__

##### Example:

```yaml
Expand Down Expand Up @@ -453,4 +465,4 @@ Gauge examples will be very similar to Counter examples with additional `action`
buckets: [0.001,0.0025,0.005,0.010,0.025,0.050]
```

This would create a Histogram which looks for _response_time_ in the `extracted` data and applies the value to the histogram.
This would create a Histogram which looks for _response_time_ in the `extracted` data and applies the value to the histogram.
24 changes: 16 additions & 8 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const (
ErrEmptyTimestampStageConfig = "timestamp stage config cannot be empty"
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampFormatRequired = "timestamp format is required"

Unix = "Unix"
UnixMs = "UnixMs"
UnixNs = "UnixNs"
)

// TimestampConfig configures timestamp extraction
Expand All @@ -23,16 +27,19 @@ type TimestampConfig struct {
Format string `mapstructure:"format"`
}

// parser can convert the time string into a time.Time value
type parser func(string) (time.Time, error)

// validateTimestampConfig validates a timestampStage configuration
func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
func validateTimestampConfig(cfg *TimestampConfig) (parser, error) {
if cfg == nil {
return "", errors.New(ErrEmptyTimestampStageConfig)
return nil, errors.New(ErrEmptyTimestampStageConfig)
}
if cfg.Source == "" {
return "", errors.New(ErrTimestampSourceRequired)
return nil, errors.New(ErrTimestampSourceRequired)
}
if cfg.Format == "" {
return "", errors.New(ErrTimestampFormatRequired)
return nil, errors.New(ErrTimestampFormatRequired)
}
return convertDateLayout(cfg.Format), nil

Expand All @@ -45,22 +52,22 @@ func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage,
if err != nil {
return nil, err
}
format, err := validateTimestampConfig(cfg)
parser, err := validateTimestampConfig(cfg)
if err != nil {
return nil, err
}
return &timestampStage{
cfgs: cfg,
logger: logger,
format: format,
parser: parser,
}, nil
}

// timestampStage will set the timestamp using extracted data
type timestampStage struct {
cfgs *TimestampConfig
logger log.Logger
format string
parser parser
}

// Process implements Stage
Expand All @@ -73,7 +80,8 @@ func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]in
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
}
parsedTs, err := time.Parse(ts.format, s)

parsedTs, err := ts.parser(s)
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
} else {
Expand Down
74 changes: 62 additions & 12 deletions pkg/logentry/stages/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ func TestTimestampPipeline(t *testing.T) {

func TestTimestampValidation(t *testing.T) {
tests := map[string]struct {
config *TimestampConfig
err error
expectedFormat string
config *TimestampConfig
err error
testString string
expectedTime time.Time
}{
"missing config": {
config: nil,
Expand All @@ -68,23 +69,34 @@ func TestTimestampValidation(t *testing.T) {
Source: "source1",
Format: time.RFC3339,
},
err: nil,
expectedFormat: time.RFC3339,
err: nil,
testString: "2012-11-01T22:08:41-04:00",
expectedTime: time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)),
},
"custom format": {
config: &TimestampConfig{
Source: "source1",
Format: "2006-01-23",
Format: "2006-01-02",
},
err: nil,
expectedFormat: "2006-01-23",
err: nil,
testString: "2009-01-01",
expectedTime: time.Date(2009, 01, 01, 00, 00, 00, 0, time.UTC),
},
"unix_ms": {
config: &TimestampConfig{
Source: "source1",
Format: "UnixMs",
},
err: nil,
testString: "1562708916919",
expectedTime: time.Date(2019, 7, 9, 21, 48, 36, 919*1000000, time.UTC),
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
format, err := validateTimestampConfig(test.config)
parser, err := validateTimestampConfig(test.config)
if (err != nil) != (test.err != nil) {
t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err)
return
Expand All @@ -93,8 +105,13 @@ func TestTimestampValidation(t *testing.T) {
t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err)
return
}
if test.expectedFormat != "" {
assert.Equal(t, test.expectedFormat, format)
if test.testString != "" {
ts, err := parser(test.testString)
if err != nil {
t.Errorf("validateOutputConfig() unexpected error parsing test time: %v", err)
return
}
assert.Equal(t, test.expectedTime.UnixNano(), ts.UnixNano())
}
})
}
Expand All @@ -117,6 +134,39 @@ func TestTimestampStage_Process(t *testing.T) {
},
time.Date(2106, 01, 02, 23, 04, 05, 0, time.FixedZone("", -4*60*60)),
},
"unix success": {
TimestampConfig{
Source: "ts",
Format: "Unix",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916",
},
time.Date(2019, 7, 9, 21, 48, 36, 0, time.UTC),
},
"unix millisecond success": {
TimestampConfig{
Source: "ts",
Format: "UnixMs",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916414",
},
time.Date(2019, 7, 9, 21, 48, 36, 414*1000000, time.UTC),
},
"unix nano success": {
TimestampConfig{
Source: "ts",
Format: "UnixNs",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916000000123",
},
time.Date(2019, 7, 9, 21, 48, 36, 123, time.UTC),
},
}
for name, test := range tests {
test := test
Expand All @@ -129,7 +179,7 @@ func TestTimestampStage_Process(t *testing.T) {
ts := time.Now()
lbls := model.LabelSet{}
st.Process(lbls, test.extracted, &ts, nil)
assert.Equal(t, test.expected, ts)
assert.Equal(t, test.expected.UnixNano(), ts.UnixNano())

})
}
Expand Down
70 changes: 58 additions & 12 deletions pkg/logentry/stages/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,76 @@ import (
)

// convertDateLayout converts pre-defined date format layout into date format
func convertDateLayout(predef string) string {
func convertDateLayout(predef string) parser {
switch predef {
case "ANSIC":
return time.ANSIC
return func(t string) (time.Time, error) {
return time.Parse(time.ANSIC, t)
}
case "UnixDate":
return time.UnixDate
return func(t string) (time.Time, error) {
return time.Parse(time.UnixDate, t)
}
case "RubyDate":
return time.RubyDate
return func(t string) (time.Time, error) {
return time.Parse(time.RubyDate, t)
}
case "RFC822":
return time.RFC822
return func(t string) (time.Time, error) {
return time.Parse(time.RFC822, t)
}
case "RFC822Z":
return time.RFC822Z
return func(t string) (time.Time, error) {
return time.Parse(time.RFC822Z, t)
}
case "RFC850":
return time.RFC850
return func(t string) (time.Time, error) {
return time.Parse(time.RFC850, t)
}
case "RFC1123":
return time.RFC1123
return func(t string) (time.Time, error) {
return time.Parse(time.RFC1123, t)
}
case "RFC1123Z":
return time.RFC1123Z
return func(t string) (time.Time, error) {
return time.Parse(time.RFC1123Z, t)
}
case "RFC3339":
return time.RFC3339
return func(t string) (time.Time, error) {
return time.Parse(time.RFC3339, t)
}
case "RFC3339Nano":
return time.RFC3339Nano
return func(t string) (time.Time, error) {
return time.Parse(time.RFC3339Nano, t)
}
case "Unix":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(i, 0), nil
}
case "UnixMs":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, i*int64(time.Millisecond)), nil
}
case "UnixNs":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, i), nil
}
default:
return predef
return func(t string) (time.Time, error) {
return time.Parse(predef, t)
}
}
}

Expand Down

0 comments on commit de83272

Please sign in to comment.