From 1c335c81e0864f45234818e9db9066fbd3e7149a Mon Sep 17 00:00:00 2001 From: wangxinalex Date: Tue, 29 Mar 2022 21:44:29 +0800 Subject: [PATCH 1/4] fix: fix the bytes encode/decode for redis cache Using `string(data)` to convert the byte array to string introduces error in json marshal/unmarshal, hence causes error when returning cached response from redis. The reason is `Unmarshal` function in `encode/json` would replace invalid UTF-8 or invalid UTF-16 pairs with `U+FFFD`, therefore the `payload` string in `redisCachePayload` will actually change after json marshal/unmarshal since the byte array may contain invalid UTF-8/UTF-16 byte, the length of payload will thereby change, resulting the http server to find the declared length in header `Content-Length` mismatches the actual length of payload. The fix is to base64-encode/decode the byte array to string, thereby eliminates invalid UTF-8/UTF-16 bytes. --- .gitignore | 1 + cache/redis_cache.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index eb3d3857..c6653ad5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ /docs/.output /docs/.nuxt /docs/static/sw.js +.idea \ No newline at end of file diff --git a/cache/redis_cache.go b/cache/redis_cache.go index ba645f96..aec3e31e 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -3,6 +3,7 @@ package cache import ( "bytes" "context" + "encoding/base64" "encoding/json" "github.com/contentsquare/chproxy/config" "github.com/contentsquare/chproxy/log" @@ -118,13 +119,17 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { log.Errorf("Not able to fetch TTL for: %s ", key) } + decoded, err := base64.StdEncoding.DecodeString(payload.Payload) + if err != nil { + log.Errorf("Not able to decode for: %s ", payload.Payload) + } value := &CachedData{ ContentMetadata: ContentMetadata{ Length: payload.Length, Type: payload.Type, Encoding: payload.Encoding, }, - Data: bytes.NewReader([]byte(payload.Payload)), + Data: bytes.NewReader(decoded), Ttl: ttl, } @@ -137,8 +142,9 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key return 0, err } + encoded := base64.StdEncoding.EncodeToString(data) payload := &redisCachePayload{ - Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: string(data), + Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded, } marshalled, err := json.Marshal(payload) From 648df1b077f901b1aa1119c500bdff7b6aecb6cc Mon Sep 17 00:00:00 2001 From: wangxinalex Date: Thu, 31 Mar 2022 23:32:31 +0800 Subject: [PATCH 2/4] fix: add test case about encode/decode the cached value add test cases for base64 encode/decode the cached value --- cache/redis_cache.go | 9 ++++---- main_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index aec3e31e..e1e621ce 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -24,7 +24,7 @@ const getTimeout = 1 * time.Second const putTimeout = 2 * time.Second const statsTimeout = 500 * time.Millisecond -type redisCachePayload struct { +type RedisCachePayload struct { Length int64 `json:"l"` Type string `json:"t"` Encoding string `json:"enc"` @@ -105,7 +105,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { return nil, ErrMissing } - var payload redisCachePayload + var payload RedisCachePayload err = json.Unmarshal([]byte(val), &payload) if err != nil { @@ -121,7 +121,8 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { decoded, err := base64.StdEncoding.DecodeString(payload.Payload) if err != nil { - log.Errorf("Not able to decode for: %s ", payload.Payload) + log.Errorf("failed to decode payload: %s , due to: %v ", payload.Payload, err) + return nil, ErrMissing } value := &CachedData{ ContentMetadata: ContentMetadata{ @@ -143,7 +144,7 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key } encoded := base64.StdEncoding.EncodeToString(data) - payload := &redisCachePayload{ + payload := &RedisCachePayload{ Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded, } diff --git a/main_test.go b/main_test.go index 1810e2cb..3e8eeca2 100644 --- a/main_test.go +++ b/main_test.go @@ -5,6 +5,8 @@ import ( "compress/gzip" "context" "crypto/tls" + "encoding/base64" + "encoding/json" "fmt" "github.com/contentsquare/chproxy/cache" "io" @@ -19,9 +21,9 @@ import ( "testing" "time" + "github.com/alicebob/miniredis/v2" "github.com/contentsquare/chproxy/config" "github.com/contentsquare/chproxy/log" - "github.com/alicebob/miniredis/v2" ) var testDir = "./temp-test-data" @@ -365,7 +367,7 @@ func TestServe(t *testing.T) { str, err := redisClient.Get(key.String()) checkErr(t, err) - if !strings.Contains(str, "Ok") || !strings.Contains(str, "text/plain") || !strings.Contains(str, "charset=utf-8") { + if !strings.Contains(str, base64.StdEncoding.EncodeToString([]byte("Ok."))) || !strings.Contains(str, "text/plain") || !strings.Contains(str, "charset=utf-8") { t.Fatalf("result from cache query is wrong: %s", str) } @@ -376,6 +378,50 @@ func TestServe(t *testing.T) { }, startHTTP, }, + { + "http requests with caching in redis (testcase for base64 encoding/decoding)", + "testdata/http.cache.redis.yml", + func(t *testing.T) { + redisClient.FlushAll() + q := "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes" + req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil) + checkErr(t, err) + + resp := httpRequest(t, req, http.StatusOK) + checkHttpResponse(t, resp, string(bytesWithInvalidUTFPairs)) + resp2 := httpRequest(t, req, http.StatusOK) + // if we do not use base64 to encode/decode the cached payload, EOF error will be thrown here. + checkHttpResponse(t, resp2, string(bytesWithInvalidUTFPairs)) + keys := redisClient.Keys() + if len(keys) != 1 { + t.Fatalf("unexpected amount of keys in redis: %v", len(keys)) + } + + // check cached response + key := &cache.Key{ + Query: []byte(q), + AcceptEncoding: "gzip", + Version: cache.Version, + } + str, err := redisClient.Get(key.String()) + checkErr(t, err) + + var unMarshaledPayload cache.RedisCachePayload + err = json.Unmarshal([]byte(str), &unMarshaledPayload) + checkErr(t, err) + if unMarshaledPayload.Payload != base64.StdEncoding.EncodeToString(bytesWithInvalidUTFPairs) { + t.Fatalf("result from cache query is wrong: %s", str) + } + decoded, err := base64.StdEncoding.DecodeString(unMarshaledPayload.Payload) + checkErr(t, err) + + if unMarshaledPayload.Length != int64(len(decoded)) { + t.Fatalf("the declared length %d and actual length %d is not same", unMarshaledPayload.Length, len(decoded)) + } + }, + startHTTP, + }, + { "http gzipped POST request", "testdata/http.cache.yml", @@ -706,6 +752,9 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) { fakeCHState.sleep() fmt.Fprint(w, "bar") + case "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes": + w.WriteHeader(http.StatusOK) + w.Write(bytesWithInvalidUTFPairs) default: if strings.Contains(string(query), killQueryPattern) { fakeCHState.kill() @@ -715,6 +764,8 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) { } } +var bytesWithInvalidUTFPairs = []byte{239, 191, 189, 1, 32, 50, 239, 191} + var fakeCHState = &stateCH{ syncCH: make(chan struct{}), } From 43ac559aadfb9a4c834a7f8e2a9615103bc19553 Mon Sep 17 00:00:00 2001 From: wangxinalex Date: Sat, 2 Apr 2022 09:15:06 +0800 Subject: [PATCH 3/4] fix: adjust the waiting time of `queue_overflow_for_user` case to pass ci minimize the waiting time between two consecutive requests --- proxy_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy_test.go b/proxy_test.go index 9c656013..7dfc7ca6 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -251,9 +251,9 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { p.users["default"].maxConcurrentQueries = 1 p.users["default"].queueCh = make(chan struct{}, 1) go makeHeavyRequest(p, time.Millisecond*20) - time.Sleep(time.Millisecond * 5) + time.Sleep(time.Millisecond * 1) // in case ci runner is slow go makeHeavyRequest(p, time.Millisecond*20) - time.Sleep(time.Millisecond * 5) + time.Sleep(time.Millisecond * 1) return makeHeavyRequest(p, time.Millisecond*20) }, }, From 4219d9ec8a3f415dbb31184e9d67ff40580ce579 Mon Sep 17 00:00:00 2001 From: wangxinalex Date: Sun, 3 Apr 2022 14:52:52 +0800 Subject: [PATCH 4/4] fix: make the `cache.redisCachePayload` private again --- cache/redis_cache.go | 6 +++--- main_test.go | 9 ++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index e1e621ce..4252601e 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -24,7 +24,7 @@ const getTimeout = 1 * time.Second const putTimeout = 2 * time.Second const statsTimeout = 500 * time.Millisecond -type RedisCachePayload struct { +type redisCachePayload struct { Length int64 `json:"l"` Type string `json:"t"` Encoding string `json:"enc"` @@ -105,7 +105,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { return nil, ErrMissing } - var payload RedisCachePayload + var payload redisCachePayload err = json.Unmarshal([]byte(val), &payload) if err != nil { @@ -144,7 +144,7 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key } encoded := base64.StdEncoding.EncodeToString(data) - payload := &RedisCachePayload{ + payload := &redisCachePayload{ Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded, } diff --git a/main_test.go b/main_test.go index 3e8eeca2..760f2b2e 100644 --- a/main_test.go +++ b/main_test.go @@ -406,7 +406,14 @@ func TestServe(t *testing.T) { str, err := redisClient.Get(key.String()) checkErr(t, err) - var unMarshaledPayload cache.RedisCachePayload + type redisCachePayload struct { + Length int64 `json:"l"` + Type string `json:"t"` + Encoding string `json:"enc"` + Payload string `json:"payload"` + } + + var unMarshaledPayload redisCachePayload err = json.Unmarshal([]byte(str), &unMarshaledPayload) checkErr(t, err) if unMarshaledPayload.Payload != base64.StdEncoding.EncodeToString(bytesWithInvalidUTFPairs) {