Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support pgzip as an alternative (de)compression implementation #5108

Merged
merged 4 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Note that these environment variables may be removed at any time.
| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | The periodicity for GC of archived workflows. |
| `ARGO_TRACE` | `bool` | Whether to enable tracing statements in Argo components. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | The requeue time for the rate limiter of the workflow queue. |
| `GZIP_IMPLEMENTATION` | `string` | The implementation of compression/decompression. Currently only "PGZip" and "GZip" are supported. Defaults to "PGZip". |
| `LEADER_ELECTION_IDENTITY` | `string` | The ID used for workflow controllers to elect a leader. |
| `MAX_OPERATION_TIME` | `time.Duration` | The maximum time a workflow operation is allowed to run for before requeuing the workflow onto the work queue. |
| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | The TTL to delete the offloaded node status. Currently only used for testing. |
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/klauspost/pgzip v1.2.5
alexec marked this conversation as resolved.
Show resolved Hide resolved
alexec marked this conversation as resolved.
Show resolved Hide resolved
github.com/mattn/goreman v0.3.7
github.com/minio/minio-go/v7 v7.0.2
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
39 changes: 33 additions & 6 deletions util/file/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,22 @@ import (
"bytes"
"compress/gzip"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"strings"

"github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
"k8s.io/utils/env"
)

var gzipImpl = env.GetString(GZipImplEnvVarKey, PGZIP)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know about this library - neat

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's super fast and works great for large workflows.


const (
GZipImplEnvVarKey = "GZIP_IMPLEMENTATION"
GZIP = "GZip"
PGZIP = "PGZip"
)

type TarReader interface {
Expand Down Expand Up @@ -66,20 +77,36 @@ func DecodeDecompressString(content string) (string, error) {
// CompressContent will compress the byte array using zip writer
func CompressContent(content []byte) []byte {
var buf bytes.Buffer
zipWriter := gzip.NewWriter(&buf)
var gzipWriter io.WriteCloser
switch gzipImpl {
case GZIP:
gzipWriter = gzip.NewWriter(&buf)
default:
gzipWriter = pgzip.NewWriter(&buf)
}

_, err := zipWriter.Write(content)
_, err := gzipWriter.Write(content)
if err != nil {
log.Warnf("Error in compressing: %v", err)
}
close(zipWriter)
close(gzipWriter)
return buf.Bytes()
}

// DecompressContent will return the uncompressed content
func DecompressContent(content []byte) ([]byte, error) {
buf := bytes.NewReader(content)
gZipReader, _ := gzip.NewReader(buf)
defer close(gZipReader)
return ioutil.ReadAll(gZipReader)
var err error
var gzipReader io.ReadCloser
switch gzipImpl {
case GZIP:
gzipReader, err = gzip.NewReader(buf)
default:
gzipReader, err = pgzip.NewReader(buf)
}
if err != nil {
return nil, fmt.Errorf("failed to decompress: %w", err)
}
defer close(gzipReader)
return ioutil.ReadAll(gzipReader)
}
18 changes: 11 additions & 7 deletions util/file/fileutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ import (

// TestCompressContentString ensures compressing then decompressing a content string works as expected
func TestCompressContentString(t *testing.T) {
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." +
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" +
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" +
"\"2019-03-07T19:14:55Z\"}}"
for _, gzipImpl := range []string{file.GZIP, file.PGZIP} {
_ = os.Setenv(file.GZipImplEnvVarKey, gzipImpl)
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." +
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" +
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" +
"\"2019-03-07T19:14:55Z\"}}"

compString := file.CompressEncodeString(content)
compString := file.CompressEncodeString(content)

resultString, _ := file.DecodeDecompressString(compString)
resultString, _ := file.DecodeDecompressString(compString)

assert.Equal(t, content, resultString)
assert.Equal(t, content, resultString)
}
_ = os.Unsetenv(file.GZipImplEnvVarKey)
}

func TestExistsInTar(t *testing.T) {
Expand Down