Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add intel/fastgo compression option #1114

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/bytedance/sonic v1.8.1
github.com/cloudwego/netpoll v0.5.0
github.com/fsnotify/fsnotify v1.5.4
github.com/intel/fastgo v1.0.1
github.com/tidwall/gjson v1.14.4
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy
github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4=
github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0=
github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ=
github.com/intel/fastgo v1.0.1 h1:Bie0KlsOOuZ7j2cdqmPRHmtPgwoFG/y54J3+lWDxPMw=
github.com/intel/fastgo v1.0.1/go.mod h1:sctwebRi3SXzcojadNKwCgZgKy/Y/pSzSUR0aFF+cPg=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand Down
64 changes: 57 additions & 7 deletions pkg/common/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,33 @@ import (
"compress/gzip"
"fmt"
"io"
"os"
"strconv"
"sync"

"github.com/intel/fastgo"
igzip "github.com/intel/fastgo/compress/gzip"

"github.com/cloudwego/hertz/pkg/common/bytebufferpool"
"github.com/cloudwego/hertz/pkg/common/stackless"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/network"
)

var fastgoEnable bool

func init() {
if fastgo.Optimized() {
fastgoEnable, _ = strconv.ParseBool(os.Getenv("HERTZ_EXP_FASTGO_ENABLE"))
}
}

const CompressDefaultCompression = 6 // flate.DefaultCompression

var gzipReaderPool sync.Pool
var (
readerPool sync.Pool
gzipReaderPool sync.Pool
)

var (
stacklessGzipWriterPoolMap = newCompressWriterPoolMap()
Expand All @@ -75,6 +91,13 @@ func newCompressWriterPoolMap() []*sync.Pool {
return m
}

type Writer = stackless.Writer

type Reader interface {
Reset(io.Reader) error
io.ReadCloser
}

type compressCtx struct {
w io.Writer
p []byte
Expand All @@ -101,13 +124,13 @@ func (w *byteSliceWriter) Write(p []byte) (int, error) {
// bytes written to w.
func WriteGunzip(w io.Writer, p []byte) (int, error) {
r := &byteSliceReader{p}
zr, err := AcquireGzipReader(r)
zr, err := acquireGzipReader(r)
if err != nil {
return 0, err
}
zw := network.NewWriter(w)
n, err := utils.CopyZeroAlloc(zw, zr)
ReleaseGzipReader(zr)
releaseGzipReader(zr)
nn := int(n)
if int64(nn) != n {
return 0, fmt.Errorf("too much data gunzipped: %d", n)
Expand All @@ -128,6 +151,27 @@ func (r *byteSliceReader) Read(p []byte) (int, error) {
return n, nil
}

func acquireGzipReader(r io.Reader) (Reader, error) {
cocotyty marked this conversation as resolved.
Show resolved Hide resolved
v := readerPool.Get()
if v == nil {
if fastgoEnable {
return igzip.NewReader(r)
}

return gzip.NewReader(r)
}
zr := v.(Reader)
if err := zr.Reset(r); err != nil {
return nil, err
}
return zr, nil
}

func releaseGzipReader(zr Reader) {
zr.Close()
readerPool.Put(zr)
}

func AcquireGzipReader(r io.Reader) (*gzip.Reader, error) {
v := gzipReaderPool.Get()
if v == nil {
Expand Down Expand Up @@ -180,25 +224,31 @@ func nonblockingWriteGzip(ctxv interface{}) {
releaseRealGzipWriter(zw, ctx.level)
}

func releaseRealGzipWriter(zw *gzip.Writer, level int) {
func releaseRealGzipWriter(zw Writer, level int) {
zw.Close()
nLevel := normalizeCompressLevel(level)
p := realGzipWriterPoolMap[nLevel]
p.Put(zw)
}

func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {
func acquireRealGzipWriter(w io.Writer, level int) Writer {
nLevel := normalizeCompressLevel(level)
p := realGzipWriterPoolMap[nLevel]
v := p.Get()
if v == nil {
zw, err := gzip.NewWriterLevel(w, level)
var zw Writer
var err error
if fastgoEnable && level <= 2 {
zw, err = igzip.NewWriterLevel(w, level)
} else {
zw, err = gzip.NewWriterLevel(w, level)
}
if err != nil {
panic(fmt.Sprintf("BUG: unexpected error from gzip.NewWriterLevel(%d): %s", level, err))
}
return zw
}
zw := v.(*gzip.Writer)
zw := v.(Writer)
zw.Reset(w)
return zw
}
Expand Down
Loading