Skip to content

Commit

Permalink
added helper script to calculate object size distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
rem7 committed Jul 8, 2024
1 parent c682a3e commit f060ca7
Show file tree
Hide file tree
Showing 8 changed files with 560 additions and 0 deletions.
3 changes: 3 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package s3tar

import (
Expand Down
3 changes: 3 additions & 0 deletions api_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package s3tar

import (
Expand Down
30 changes: 30 additions & 0 deletions cmd/bucket-dist/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module bucket-dist

go 1.20

require (
github.com/aws/aws-sdk-go-v2/config v1.27.23
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.4
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.0
golang.org/x/sync v0.7.0
)

require (
github.com/aws/aws-sdk-go-v2 v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.23 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.1 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
)
52 changes: 52 additions & 0 deletions cmd/bucket-dist/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o=
github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 h1:tW1/Rkad38LA15X4UQtjXZXNKsCgkshC3EbmcUmghTg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3/go.mod h1:UbnqO+zjqk3uIt9yCACHJ9IVNhyhOCnYk8yA19SAWrM=
github.com/aws/aws-sdk-go-v2/config v1.27.23 h1:Cr/gJEa9NAS7CDAjbnB7tHYb3aLZI2gVggfmSAasDac=
github.com/aws/aws-sdk-go-v2/config v1.27.23/go.mod h1:WMMYHqLCFu5LH05mFOF5tsq1PGEMfKbu083VKqLCd0o=
github.com/aws/aws-sdk-go-v2/credentials v1.17.23 h1:G1CfmLVoO2TdQ8z9dW+JBc/r8+MqyPQhXCafNZcXVZo=
github.com/aws/aws-sdk-go-v2/credentials v1.17.23/go.mod h1:V/DvSURn6kKgcuKEk4qwSwb/fZ2d++FFARtWSbXnLqY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.9 h1:Aznqksmd6Rfv2HQN9cpqIV/lQRMaIpJkLLaJ1ZI76no=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.9/go.mod h1:WQr3MY7AxGNxaqAtsDWn+fBxmd4XvLkzeqQ8P1VM0/w=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.4 h1:6eKRM6fgeXG4krRO9XKz755vuRhT5UyB9M1W6vjA3JU=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.4/go.mod h1:h0TjcRi+nTob6fksqubKOe+Hra8uqfgmN+vuw4xRwWE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 h1:5SAoZ4jYpGH4721ZNoS1znQrhOfZinOhc4XuTXx/nVc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13/go.mod h1:+rdA6ZLpaSeM7tSg/B0IEDinCIBJGmW8rKDFkYpP04g=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 h1:WIijqeaAO7TYFLbhsZmi2rgLEAtWOC1LhxCAVTJlSKw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13/go.mod h1:i+kbfa76PQbWw/ULoWnp51EYVWH4ENln76fLQE3lXT8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.13 h1:THZJJ6TU/FOiM7DZFnisYV9d49oxXWUzsVIMTuf3VNU=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.13/go.mod h1:VISUTg6n+uBaYIWPBaIG0jk7mbBxm7DUqBtU2cUDDWI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.15 h1:2jyRZ9rVIMisyQRnhSS/SqlckveoxXneIumECVFP91Y=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.15/go.mod h1:bDRG3m382v1KJBk1cKz7wIajg87/61EiiymEyfLvAe0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.15 h1:I9zMeF107l0rJrpnHpjEiiTSCKYAIw8mALiXcPsGBiA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.15/go.mod h1:9xWJ3Q/S6Ojusz1UIkfycgD1mGirJfLLKqq3LPT7WN8=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.13 h1:Eq2THzHt6P41mpjS2sUzz/3dJYFRqdWZ+vQaEMm98EM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.13/go.mod h1:FgwTca6puegxgCInYwGjmd4tB9195Dd6LCuA+8MjpWw=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.0 h1:4rhV0Hn+bf8IAIUphRX1moBcEvKJipCPmswMCl6Q5mw=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.0/go.mod h1:hdV0NTYd0RwV4FvNKhKUNbPLZoq9CTr/lke+3I7aCAI=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.1 h1:p1GahKIjyMDZtiKoIn0/jAj/TkMzfzndDv5+zi2Mhgc=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.1/go.mod h1:/vWdhoIoYA5hYoPZ6fm7Sv4d8701PiG5VKe8/pPJL60=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.1 h1:lCEv9f8f+zJ8kcFeAjRZsekLd/x5SAm96Cva+VbUdo8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.1/go.mod h1:xyFHA4zGxgYkdD73VeezHt3vSKEG9EmFnGwoKlP00u4=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.1 h1:+woJ607dllHJQtsnJLi52ycuqHMwlW+Wqm2Ppsfp4nQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.1/go.mod h1:jiNR3JqT15Dm+QWq2SRgh0x0bCNSRP2L25+CqPNpJlQ=
github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
300 changes: 300 additions & 0 deletions cmd/bucket-dist/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"golang.org/x/sync/errgroup"
"io"
"log"
"regexp"
"strconv"
"strings"
"sync/atomic"
"text/tabwriter"
"time"
)

var (
client *s3.Client
manifestPath *string
threads *int
withPrefix *string
extractS3 = regexp.MustCompile(`s3://(.[^/]*)/?(.*)`)
)

func init() {
manifestPath = flag.String("manifest", "", "inventory json manifest, s3://bucket/manifest.json")
withPrefix = flag.String("prefix", "", "only count files in the prefix")
threads = flag.Int("threads", 5, "how many goroutines")
}

type Bin struct {
Name string
LowerBound int
UpperBound int
}

var BinLimits = []Bin{
{"0 bytes", 0, 0},
{"1 byte to 1 KB", 1, 1024},
{"1 KB to 2 KB", 1025, 2048},
{"2 KB to 4 KB", 2049, 4096},
{"4 KB to 8 KB", 4097, 8192},
{"8 KB to 16 KB", 8193, 16384},
{"16 KB to 32 KB", 16385, 32768},
{"32 KB to 64 KB", 32769, 65536},
{"64 KB to 128 KB", 65537, 131072},
{"128 KB to 256 KB", 131073, 262144},
{"256 KB to 512 KB", 262145, 524288},
{"512 KB to 1 MB", 524289, 1048576},
{"1 MB to 2 MB", 1048577, 2097152},
{"Greater than 2 MB", 2097153, -1},
}

func main() {
flag.Parse()
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Println("error:", err)
return
}
client = s3.NewFromConfig(cfg)
ctx := context.Background()
start := time.Now()

ExtractBucketAndPath(*manifestPath)
fmt.Printf("loading manifest:\t%s\n", *manifestPath)
manifest, err := loadManifest(ctx, *manifestPath)
if err != nil {
panic(err)
}
fmt.Printf("%s\n", manifest.ManifestFileBucket())
fmt.Printf("manifest contains %d CSVs\n", len(manifest.Files))

bins, totalObjects := calculateObjectSizeDistribution(ctx, manifest, *withPrefix, downloadData)
report := printReport(bins, totalObjects)
fmt.Println(report)

fmt.Printf("total objects: %d\n", totalObjects)
fmt.Printf("time elapsed: %s\n", time.Now().Sub(start))
}

func calculateObjectSizeDistribution(ctx context.Context, manifest *ManifestFile, withPrefix string, fetchDataFn func(context.Context, string, string) (io.ReadCloser, error)) (map[string]int, uint64) {
bucket := manifest.ManifestFileBucket()
var totalLines atomic.Uint64

ParseAllCSV := func(ctx context.Context, bucket string, CSVList []CSVFile) (map[string]int, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(*threads)
results := make([]map[string]int, len(CSVList))

for i, f := range CSVList {
f := f
i := i
g.Go(func() error {
fmt.Printf("bucket: %s\n", bucket)
fmt.Printf("downloading (%d/%d) s3://%s/%s\n", i+1, len(CSVList), bucket, f.Key)
csvgzip, err := fetchDataFn(ctx, bucket, f.Key)
if err != nil {
return err
}
defer csvgzip.Close()
res, lineCount, err := binCsvObjects(csvgzip, withPrefix, manifest)
if err != nil {
return err
}
results[i] = res
totalLines.Add(lineCount)
return nil
})
}

bins := make(map[string]int)
if err := g.Wait(); err != nil {
return bins, err
}

for _, bin := range BinLimits {
bins[bin.Name] = 0
}

for _, result := range results {
for bin, count := range result {
bins[bin] += count
}
}

return bins, nil
}

allBins, err := ParseAllCSV(ctx, bucket, manifest.Files)
if err != nil {
log.Printf("something went wrong: %s", err.Error())
}

totalObjects := totalLines.Load()
return allBins, totalObjects
}

// determineBin function calculates the bin based on the file size
func determineBin(size int) string {
for _, bin := range BinLimits {
if size == 0 || size < bin.UpperBound {
return bin.Name
}
}
return "Greater than 2 MB"
}

func printReport(bins map[string]int, totalObjects uint64) string {
var reportOutput bytes.Buffer
w := tabwriter.NewWriter(&reportOutput, 0, 0, 1, ' ', 0)
fmt.Fprintln(w, "File Size Distribution:")
fmt.Fprintln(w, "Bin\tCount\tPercentage\tProgress\t")
fmt.Fprintln(w, "---\t-----\t----------\t--------\t")
for _, bin := range BinLimits {
count := bins[bin.Name]
percentage := float64(count) / float64(totalObjects) * 100
bar := strings.Repeat("#", int(percentage/2)) // Adjust bar width if needed
fmt.Fprintf(w, "%-20s\t%4d\t%6.2f%%\t|%-50s|\t\n", bin.Name, count, percentage, bar)
}
w.Flush()
return reportOutput.String()
}

// binCsvObjects will parse a CSV and place the size in a bin map
func binCsvObjects(r io.ReadCloser, withPrefix string, manifest *ManifestFile) (map[string]int, uint64, error) {
var totalLines uint64
bins := make(map[string]int)

gr, err := gzip.NewReader(r)
if err != nil {
return bins, 0, err
}
csvreader := csv.NewReader(gr)
for {
record, err := csvreader.Read()
if err == io.EOF {
break
}
if err != nil {
return bins, 0, err
}
if withPrefix != "" && !strings.HasPrefix(record[manifest.KeyColumn], withPrefix) {
continue
}
totalLines += 1

// find size here
objectSize, err := strconv.Atoi(record[manifest.SizeColumn])
if err != nil {
return bins, 0, err
}
bin := determineBin(objectSize)
bins[bin] += 1
}
return bins, totalLines, nil
}

func downloadData(ctx context.Context, bucket, key string) (io.ReadCloser, error) {
output, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return nil, err
}
return output.Body, nil

}

type ManifestFile struct {
SourceBucket string `json:"sourceBucket"`
DestinationBucket string `json:"destinationBucket"`
Version string `json:"version"`
CreationTimestamp string `json:"creationTimestamp"`
FileFormat string `json:"fileFormat"`
FileSchema string `json:"fileSchema"`
Files []CSVFile `json:"files"`
SizeColumn int
KeyColumn int
}

func (m *ManifestFile) ManifestFileBucket() string {
return m.DestinationBucket[13:]
}

type CSVFile struct {
Key string `json:"key"`
Size int `json:"size"`
MD5Checksum string `json:"MD5checksum"`
}

func loadManifest(ctx context.Context, path string) (*ManifestFile, error) {
bucket, key := ExtractBucketAndPath(path)

r, err := downloadData(ctx, bucket, key)
if err != nil {
return nil, err
}

data, err := io.ReadAll(r)
if err != nil {
return nil, err
}
jdata := new(ManifestFile)
err = json.Unmarshal(data, jdata)
if err != nil {
return nil, err
}

fileSchemaList := strings.Split(jdata.FileSchema, ",")

if val, err := getAttrPosition(fileSchemaList, "size"); err == nil {
jdata.SizeColumn = val
} else {
return nil, err
}

if val, err := getAttrPosition(fileSchemaList, "key"); err == nil {
jdata.KeyColumn = val
} else {
return nil, err
}

return jdata, nil
}

func getAttrPosition(list []string, attr string) (int, error) {
var val int = -1
for i := 0; i < len(list); i++ {
if strings.Contains(strings.ToLower(list[i]), attr) {
val = i
break
}
}
if val < 0 {
return val, fmt.Errorf("size attribute not found in S3 Inventory manifest")
}
return val, nil
}

// ExtractBucketAndPath helper function to extract bucket and key from s3://bucket/prefix/key URLs
func ExtractBucketAndPath(s3url string) (bucket string, path string) {
parts := extractS3.FindAllStringSubmatch(s3url, -1)
if len(parts) > 0 && len(parts[0]) > 2 {
bucket = parts[0][1]
path = parts[0][2]
}
return
}
Loading

0 comments on commit f060ca7

Please sign in to comment.