Skip to content

Commit

Permalink
Merge pull request #5 from dbgeek/0.1.0
Browse files Browse the repository at this point in the history
0.1.0 - tail command
  • Loading branch information
dbgeek committed Mar 17, 2019
2 parents 318118b + 3776af0 commit 79926b5
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ changelog:
# Default is empty
exclude:
- '^docs:'
- '^ci:'
- '^ci:'
- '^Merge'
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

`elblogcat` is tool to list accesslogs for aws loadbalancers and cat them to see the content of them without need to download them local and cat them manual.

This cli program is inspired by [wolviecb](https://github.com/wolviecb) that made bash script that analyses elb/access logs.

## Installation

### Manual installation - compile it
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func init() {
viper.BindPFlag("start-time", rootCmd.PersistentFlags().Lookup("start-time"))
rootCmd.PersistentFlags().StringP("end-time", "", time.Now().Format("2006-01-02 15:04:05"), "")
viper.BindPFlag("end-time", rootCmd.PersistentFlags().Lookup("end-time"))
rootCmd.PersistentFlags().Int64P("max-keys", "", 500, "control nr of keys that should be return from s3 api for each response.")
viper.BindPFlag("max-keys", rootCmd.PersistentFlags().Lookup("max-keys"))

// Cobra also supports local flags, which will only run
// when this action is called directly.
Expand Down
65 changes: 65 additions & 0 deletions cmd/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cmd

import (
"bytes"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/dbgeek/elblogcat/logcat"
"github.com/dbgeek/elblogcat/logworker"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// tailCmd represents the tail command
var tailCmd = &cobra.Command{
Use: "tail",
Short: "Porman tail pool for new accesslogs for default every 1min",
Long: `
`,
Run: func(cmd *cobra.Command, args []string) {
awsConfiguration := logworker.AWSconfiguration{Region: "eu-west-1"}
configuration := logworker.NewConfiguration()
accessLogFilter := logworker.NewAccessLogFilter()
client := logworker.NewLogWorker(
&awsConfiguration,
&configuration,
&accessLogFilter,
)
logs := make(chan string, 1)

client.Tail(logs)

for v := range logs {
buff := &aws.WriteAtBuffer{}
key := fmt.Sprintf("%s%s", accessLogFilter.AccesslogPath(configuration.Prefix), v)
_, err := client.S3Downloader.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(viper.GetString("s3-bucket")),
Key: aws.String(key),
})
if err != nil {
logworker.Logger.Fatalf("Failed to Download key: %v from s3. Got error: %v",
key,
err)
}

c := logcat.NewRowFilter()
b := bytes.NewBuffer(buff.Bytes())
a := logcat.Accesslog{
Content: b,
RowFilter: c,
PrintFields: viper.GetString("fields"),
}
a.Cat()
}
},
}

func init() {
rootCmd.AddCommand(tailCmd)
tailCmd.PersistentFlags().Duration("polling-interval", 60*time.Second, "")
viper.BindPFlag("polling-interval", tailCmd.PersistentFlags().Lookup("polling-interval"))

}
106 changes: 98 additions & 8 deletions logworker/logworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

type (
// LogWorker worker
LogWorker struct {
Config *AWSconfiguration
S3 *s3.S3
Expand All @@ -30,10 +31,14 @@ type (
Region string
Profile string
}
// Configuration hold the configuration that is needed.
Configuration struct {
Bucket string
Prefix string
Bucket string
Prefix string
PollingInterval time.Duration
MaxKeys int64
}
// AccessLogFilter ..
AccessLogFilter struct {
matchString string
AwsAccountID string
Expand All @@ -47,7 +52,12 @@ type (
}
)

const (
accessLogEndTimeFormat string = "20060102T1504Z"
)

var (
// Logger instance of logrus.Logger
Logger *logrus.Logger
)

Expand Down Expand Up @@ -80,6 +90,7 @@ func newFilter(accessLogFilter *AccessLogFilter) *regexp.Regexp {
return regexp
}

// NewLogWorker return a pointer of LogWorker
func NewLogWorker(
awsConfiguration *AWSconfiguration,
configuration *Configuration,
Expand Down Expand Up @@ -113,6 +124,7 @@ func NewLogWorker(
return &logWorker
}

// List returns slice of string with accesslog names
func (l *LogWorker) List() []string {

var accessLogs []string
Expand All @@ -133,19 +145,95 @@ func (l *LogWorker) List() []string {
return true
})
if err != nil {
fmt.Println(err)
Logger.Fatalf("listObjectV2Pages return with error: %v", err)
}
return accessLogs
}

// Tail return chan of string
func (l *LogWorker) Tail(logch chan<- string) {
go func() {
accessLogFilter := NewAccessLogFilter()
consumedAccessLogs := make(map[string]struct{})

lbAccessLogTimestamp := l.AccessLogFilter.StartTime
for t := lbAccessLogTimestamp; t.Before(time.Now().UTC()); t = t.Add(5 * time.Minute) {
lbAccessLogTimestamp = t
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
accessLogFilter.AwsAccountID,
accessLogFilter.Region,
accessLogFilter.LoadBalancerID,
t.Format(accessLogEndTimeFormat),
)
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
if _, ok := consumedAccessLogs[accessLog]; !ok {
consumedAccessLogs[accessLog] = struct{}{}
logch <- accessLog
}
}
}

poller := time.Tick(l.Configuration.PollingInterval)
for now := range poller {

lbAccessLogTimestamp = lbAccessLogTimestamp.Add(15 * time.Second)
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
accessLogFilter.AwsAccountID,
accessLogFilter.Region,
accessLogFilter.LoadBalancerID,
now.UTC().Format(accessLogEndTimeFormat),
)
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
if _, ok := consumedAccessLogs[accessLog]; !ok {
consumedAccessLogs[accessLog] = struct{}{}
logch <- accessLog
}
}
for k := range consumedAccessLogs {
ts := strings.Split(k, "_")
t, _ := time.Parse(accessLogEndTimeFormat, ts[4])
if t.Before(now.UTC().Add(-2 * time.Minute)) {
delete(consumedAccessLogs, k)
}

}
}
}()
}

func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string {
var al []string
input := &s3.ListObjectsV2Input{
Bucket: aws.String(l.Configuration.Bucket),
Prefix: aws.String(s3Prefix),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(l.Configuration.MaxKeys),
}
err := l.S3.ListObjectsV2Pages(input,
func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, val := range page.Contents {
accessLog := strings.Split(*val.Key, "/")[len(strings.Split(*val.Key, "/"))-1]
al = append(al, accessLog)
}
return true
})
if err != nil {
Logger.Fatalf("listObjectV2Pages return with error: %v", err)
}
return &al
}

// AccesslogPath return string of the key of accesslog (accesslog with full path of s3)
func (a *AccessLogFilter) AccesslogPath(prefix string) string {
return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/"

}

func (a *AccessLogFilter) filterByTime(accessLog string) bool {
accessLogEndTimeStr := strings.Split(accessLog, "_")[4]
accessLogEndTimeStamp, err := time.Parse("20060102T1504Z", accessLogEndTimeStr)
accessLogEndTimeStamp, err := time.Parse(accessLogEndTimeFormat, accessLogEndTimeStr)
if err != nil {
Logger.Fatalf("failed to parse timestamp for accesslog name")
}
Expand All @@ -171,17 +259,16 @@ func (a *AccessLogFilter) filterByTime(accessLog string) bool {
return false
}

// NewAccessLogFilter Return AccessLogFilter
func NewAccessLogFilter() AccessLogFilter {

startTime, err := time.Parse("2006-01-02 15:04:05", viper.GetString("start-time"))
if err != nil {
Logger.Fatalf("Failed to parse start time. Gott error: %v", err)
fmt.Println("failed to parse starttime")
}
endTime, err := time.Parse("2006-01-02 15:04:05", viper.GetString("end-time"))
if err != nil {
Logger.Fatalf("Failed to parse end time. Gott error: %v", err)
fmt.Println("failed to parse endtime")
}
accessLogFilter := AccessLogFilter{}
accessLogFilter.AwsAccountID = viper.GetString("aws-account-id")
Expand All @@ -195,9 +282,12 @@ func NewAccessLogFilter() AccessLogFilter {
return accessLogFilter
}

// NewConfiguration return Configuration
func NewConfiguration() Configuration {
return Configuration{
Bucket: viper.GetString("s3-bucket"),
Prefix: viper.GetString("s3-prefix"),
Bucket: viper.GetString("s3-bucket"),
Prefix: viper.GetString("s3-prefix"),
PollingInterval: viper.GetDuration("polling-interval"),
MaxKeys: viper.GetInt64("max-keys"),
}
}

0 comments on commit 79926b5

Please sign in to comment.