From 0c7a28cb95e8e80c37f9ed999533c66fc83405bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Fri, 15 Mar 2019 22:38:26 +0100 Subject: [PATCH 1/8] feat: tail command command tail is a poor man tail that pool s3 for new access logs default every 1min. It possible to change how often it should pool for new access logs from s3 with flag --polling-interval --- cmd/tail.go | 65 ++++++++++++++++++++++++++++++++ logworker/logworker.go | 84 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 cmd/tail.go diff --git a/cmd/tail.go b/cmd/tail.go new file mode 100644 index 0000000..dd88c7e --- /dev/null +++ b/cmd/tail.go @@ -0,0 +1,65 @@ +package cmd + +import ( + "bytes" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/dbgeek/elblogcat/logcat" + "github.com/dbgeek/elblogcat/logworker" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// tailCmd represents the tail command +var tailCmd = &cobra.Command{ + Use: "tail", + Short: "Porman tail pool for new accesslogs for default every 1min", + Long: ` +`, + Run: func(cmd *cobra.Command, args []string) { + awsConfiguration := logworker.AWSconfiguration{Region: "eu-west-1"} + configuration := logworker.NewConfiguration() + accessLogFilter := logworker.NewAccessLogFilter() + client := logworker.NewLogWorker( + &awsConfiguration, + &configuration, + &accessLogFilter, + ) + logs := make(chan string, 1) + + client.Tail(logs) + + for v := range logs { + buff := &aws.WriteAtBuffer{} + key := fmt.Sprintf("%s%s", accessLogFilter.AccesslogPath(configuration.Prefix), v) + _, err := client.S3Downloader.Download(buff, &s3.GetObjectInput{ + Bucket: aws.String(viper.GetString("s3-bucket")), + Key: aws.String(key), + }) + if err != nil { + logworker.Logger.Fatalf("Failed to Download key: %v from s3. Got error: %v", + key, + err) + } + + c := logcat.NewRowFilter() + b := bytes.NewBuffer(buff.Bytes()) + a := logcat.Accesslog{ + Content: b, + RowFilter: c, + PrintFields: viper.GetString("fields"), + } + a.Cat() + } + }, +} + +func init() { + rootCmd.AddCommand(tailCmd) + tailCmd.PersistentFlags().Duration("polling-interval", 60*time.Second, "") + viper.BindPFlag("polling-interval", tailCmd.PersistentFlags().Lookup("polling-interval")) + +} diff --git a/logworker/logworker.go b/logworker/logworker.go index e8b592d..547c522 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -31,8 +31,9 @@ type ( Profile string } Configuration struct { - Bucket string - Prefix string + Bucket string + Prefix string + PollingInterval time.Duration } AccessLogFilter struct { matchString string @@ -138,6 +139,80 @@ func (l *LogWorker) List() []string { return accessLogs } +func (l *LogWorker) Tail(logch chan<- string) { + go func() { + accessLogFilter := NewAccessLogFilter() + consumedAccessLogs := make(map[string]struct{}) + + lbAccessLogTimestamp := l.AccessLogFilter.StartTime + for t := lbAccessLogTimestamp; t.Before(time.Now().UTC()); t = t.Add(5 * time.Minute) { + lbAccessLogTimestamp = t + lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s", + accessLogFilter.AwsAccountID, + accessLogFilter.Region, + accessLogFilter.LoadBalancerID, + t.Format("20060102T1504Z"), + ) + s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) + for _, accessLog := range *l.listAccessLogs(s3Prefix) { + if _, ok := consumedAccessLogs[accessLog]; !ok { + consumedAccessLogs[accessLog] = struct{}{} + logch <- accessLog + } + } + } + + poller := time.Tick(l.Configuration.PollingInterval) + for now := range poller { + + lbAccessLogTimestamp = lbAccessLogTimestamp.Add(15 * time.Second) + lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s", + accessLogFilter.AwsAccountID, + accessLogFilter.Region, + accessLogFilter.LoadBalancerID, + now.UTC().Format("20060102T1504Z"), + ) + s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) + for _, accessLog := range *l.listAccessLogs(s3Prefix) { + if _, ok := consumedAccessLogs[accessLog]; !ok { + consumedAccessLogs[accessLog] = struct{}{} + logch <- accessLog + } + } + for k := range consumedAccessLogs { + ts := strings.Split(k, "_") + t, _ := time.Parse("20060102T1504Z", ts[4]) + if t.Before(now.UTC().Add(-2 * time.Minute)) { + delete(consumedAccessLogs, k) + } + + } + } + }() +} + +func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string { + var al []string + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(l.Configuration.Bucket), + Prefix: aws.String(s3Prefix), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(200), + } + err := l.S3.ListObjectsV2Pages(input, + func(page *s3.ListObjectsV2Output, lastPage bool) bool { + for _, val := range page.Contents { + accessLog := strings.Split(*val.Key, "/")[len(strings.Split(*val.Key, "/"))-1] + al = append(al, accessLog) + } + return true + }) + if err != nil { + fmt.Println(err) + } + return &al +} + func (a *AccessLogFilter) AccesslogPath(prefix string) string { return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/" @@ -197,7 +272,8 @@ func NewAccessLogFilter() AccessLogFilter { func NewConfiguration() Configuration { return Configuration{ - Bucket: viper.GetString("s3-bucket"), - Prefix: viper.GetString("s3-prefix"), + Bucket: viper.GetString("s3-bucket"), + Prefix: viper.GetString("s3-prefix"), + PollingInterval: viper.GetDuration("polling-interval"), } } From 7527e8d7f99ec432df9b9f868f450c8d75fcf954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 12:34:53 +0100 Subject: [PATCH 2/8] ci: ignore Merge commit messages in changelog --- .goreleaser.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index 17c8629..039ead7 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -44,4 +44,5 @@ changelog: # Default is empty exclude: - '^docs:' - - '^ci:' \ No newline at end of file + - '^ci:' + - '^Merge' \ No newline at end of file From 317a6e15f3b6f6be28a568d641eef73dcea46359 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 12:44:59 +0100 Subject: [PATCH 3/8] docs: Adding doc comments for Exported identifiers --- logworker/logworker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/logworker/logworker.go b/logworker/logworker.go index 547c522..a2c212f 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -18,6 +18,7 @@ import ( ) type ( + // LogWorker worker LogWorker struct { Config *AWSconfiguration S3 *s3.S3 @@ -30,11 +31,14 @@ type ( Region string Profile string } + // Configuration hold the configuration that is needed. Configuration struct { Bucket string Prefix string PollingInterval time.Duration + MaxKeys int64 } + // AccessLogFilter .. AccessLogFilter struct { matchString string AwsAccountID string @@ -49,6 +53,7 @@ type ( ) var ( + // Logger instance of logrus.Logger Logger *logrus.Logger ) @@ -81,6 +86,7 @@ func newFilter(accessLogFilter *AccessLogFilter) *regexp.Regexp { return regexp } +// NewLogWorker return a pointer of LogWorker func NewLogWorker( awsConfiguration *AWSconfiguration, configuration *Configuration, @@ -114,6 +120,7 @@ func NewLogWorker( return &logWorker } +// List returns slice of string with accesslog names func (l *LogWorker) List() []string { var accessLogs []string @@ -139,6 +146,7 @@ func (l *LogWorker) List() []string { return accessLogs } +// Tail return chan of string func (l *LogWorker) Tail(logch chan<- string) { go func() { accessLogFilter := NewAccessLogFilter() @@ -213,6 +221,7 @@ func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string { return &al } +// AccesslogPath return string of the key of accesslog (accesslog with full path of s3) func (a *AccessLogFilter) AccesslogPath(prefix string) string { return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/" @@ -246,6 +255,7 @@ func (a *AccessLogFilter) filterByTime(accessLog string) bool { return false } +// NewAccessLogFilter Return AccessLogFilter func NewAccessLogFilter() AccessLogFilter { startTime, err := time.Parse("2006-01-02 15:04:05", viper.GetString("start-time")) @@ -270,6 +280,7 @@ func NewAccessLogFilter() AccessLogFilter { return accessLogFilter } +// NewConfiguration return Configuration func NewConfiguration() Configuration { return Configuration{ Bucket: viper.GetString("s3-bucket"), From 6cb1ea544143d31e4f32c475e39006d31264341c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 12:55:08 +0100 Subject: [PATCH 4/8] refactor: define accesslog endtime fmt as constant --- logworker/logworker.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/logworker/logworker.go b/logworker/logworker.go index a2c212f..4d17e92 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -52,6 +52,10 @@ type ( } ) +const ( + accessLogEndTimeFormat string = "20060102T1504Z" +) + var ( // Logger instance of logrus.Logger Logger *logrus.Logger @@ -159,7 +163,7 @@ func (l *LogWorker) Tail(logch chan<- string) { accessLogFilter.AwsAccountID, accessLogFilter.Region, accessLogFilter.LoadBalancerID, - t.Format("20060102T1504Z"), + t.Format(accessLogEndTimeFormat), ) s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) for _, accessLog := range *l.listAccessLogs(s3Prefix) { @@ -178,7 +182,7 @@ func (l *LogWorker) Tail(logch chan<- string) { accessLogFilter.AwsAccountID, accessLogFilter.Region, accessLogFilter.LoadBalancerID, - now.UTC().Format("20060102T1504Z"), + now.UTC().Format(accessLogEndTimeFormat), ) s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) for _, accessLog := range *l.listAccessLogs(s3Prefix) { @@ -189,7 +193,7 @@ func (l *LogWorker) Tail(logch chan<- string) { } for k := range consumedAccessLogs { ts := strings.Split(k, "_") - t, _ := time.Parse("20060102T1504Z", ts[4]) + t, _ := time.Parse(accessLogEndTimeFormat, ts[4]) if t.Before(now.UTC().Add(-2 * time.Minute)) { delete(consumedAccessLogs, k) } @@ -229,7 +233,7 @@ func (a *AccessLogFilter) AccesslogPath(prefix string) string { func (a *AccessLogFilter) filterByTime(accessLog string) bool { accessLogEndTimeStr := strings.Split(accessLog, "_")[4] - accessLogEndTimeStamp, err := time.Parse("20060102T1504Z", accessLogEndTimeStr) + accessLogEndTimeStamp, err := time.Parse(accessLogEndTimeFormat, accessLogEndTimeStr) if err != nil { Logger.Fatalf("failed to parse timestamp for accesslog name") } From 65d0375315acf5ec997875d85e53b54670bd1756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 12:56:03 +0100 Subject: [PATCH 5/8] refactor: switching some fmt.Print to logrus. --- logworker/logworker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/logworker/logworker.go b/logworker/logworker.go index 4d17e92..17a18f6 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -145,7 +145,7 @@ func (l *LogWorker) List() []string { return true }) if err != nil { - fmt.Println(err) + Logger.Fatalf("listObjectV2Pages return with error: %v", err) } return accessLogs } @@ -220,7 +220,7 @@ func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string { return true }) if err != nil { - fmt.Println(err) + Logger.Fatalf("listObjectV2Pages return with error: %v", err) } return &al } @@ -265,12 +265,10 @@ func NewAccessLogFilter() AccessLogFilter { startTime, err := time.Parse("2006-01-02 15:04:05", viper.GetString("start-time")) if err != nil { Logger.Fatalf("Failed to parse start time. Gott error: %v", err) - fmt.Println("failed to parse starttime") } endTime, err := time.Parse("2006-01-02 15:04:05", viper.GetString("end-time")) if err != nil { Logger.Fatalf("Failed to parse end time. Gott error: %v", err) - fmt.Println("failed to parse endtime") } accessLogFilter := AccessLogFilter{} accessLogFilter.AwsAccountID = viper.GetString("aws-account-id") From be71675cdde9193f903e31bb89a43d1e5dfdd8bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 12:58:02 +0100 Subject: [PATCH 6/8] feature: Adding global flag max-keys Possible to control nr keys that is return in respnse body from s3 api --- cmd/root.go | 2 ++ logworker/logworker.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index 0eac364..7ac80e6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -63,6 +63,8 @@ func init() { viper.BindPFlag("start-time", rootCmd.PersistentFlags().Lookup("start-time")) rootCmd.PersistentFlags().StringP("end-time", "", time.Now().Format("2006-01-02 15:04:05"), "") viper.BindPFlag("end-time", rootCmd.PersistentFlags().Lookup("end-time")) + rootCmd.PersistentFlags().Int64P("max-keys", "", 500, "control nr of keys that should be return from s3 api for each response.") + viper.BindPFlag("max-keys", rootCmd.PersistentFlags().Lookup("max-keys")) // Cobra also supports local flags, which will only run // when this action is called directly. diff --git a/logworker/logworker.go b/logworker/logworker.go index 17a18f6..d12e328 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -209,7 +209,7 @@ func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string { Bucket: aws.String(l.Configuration.Bucket), Prefix: aws.String(s3Prefix), Delimiter: aws.String("/"), - MaxKeys: aws.Int64(200), + MaxKeys: aws.Int64(l.Configuration.MaxKeys), } err := l.S3.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool { From ddaed12867093de5103fcea3711ed6762140a1d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 13:38:00 +0100 Subject: [PATCH 7/8] fix: NewConfiguration did not set MaxKeys --- logworker/logworker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logworker/logworker.go b/logworker/logworker.go index d12e328..94e7f9c 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -288,5 +288,6 @@ func NewConfiguration() Configuration { Bucket: viper.GetString("s3-bucket"), Prefix: viper.GetString("s3-prefix"), PollingInterval: viper.GetDuration("polling-interval"), + MaxKeys: viper.GetInt64("max-keys"), } } From 3776af014b0f487bdbc63af4daae6bf10bf7265a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Sun, 17 Mar 2019 13:39:40 +0100 Subject: [PATCH 8/8] docs: This cli Inspired by wolviecb. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 0f6983c..ef22144 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ `elblogcat` is tool to list accesslogs for aws loadbalancers and cat them to see the content of them without need to download them local and cat them manual. +This cli program is inspired by [wolviecb](https://github.com/wolviecb) that made bash script that analyses elb/access logs. + ## Installation ### Manual installation - compile it