Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions sdk/go-doris-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,29 @@ GroupCommit: doris.OFF, // Off, use traditional mode

> ⚠️ **Note**: When Group Commit is enabled, all Label configurations are automatically ignored and warning logs are recorded.

### Gzip Compression

```go
config := &doris.Config{
Endpoints: []string{"http://127.0.0.1:8030"},
User: "root",
Password: "password",
Database: "test_db",
Table: "users",
Format: doris.DefaultCSVFormat(), // works with both CSV and JSON formats
Retry: doris.DefaultRetry(),
GroupCommit: doris.OFF,
EnableGzip: true, // SDK compresses the body with gzip and sets compress_type=gz header automatically
}

client, _ := doris.NewLoadClient(config)

data := "1,Alice,25\n2,Bob,30\n3,Charlie,35"
response, err := client.Load(doris.StringReader(data))
```

> **Note**: The SDK compresses the request body transparently — no need to pre-compress the data. Whether JSON compression is supported depends on the Doris version.

## 🔄 Concurrent Usage

### Basic Concurrency Example
Expand Down Expand Up @@ -330,6 +353,7 @@ go run cmd/examples/main.go single # Large batch load (100k records)
go run cmd/examples/main.go concurrent # Concurrent load (1M records, 10 workers)
go run cmd/examples/main.go json # JSON load (50k records)
go run cmd/examples/main.go basic # Basic concurrency (5 workers)
go run cmd/examples/main.go gzip # Gzip compressed CSV load
```

## 🛠️ Utility Tools
Expand Down
85 changes: 85 additions & 0 deletions sdk/go-doris-sdk/cmd/compress_bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Usage:
// go run cmd/compress_bench/main.go
//
// Generates sample CSV and JSON data at various batch sizes,
// compresses with gzip, and prints original/compressed sizes and ratio.

package main

import (
"bytes"
"compress/gzip"
"fmt"
"strings"
)

func gzipSize(data string) int {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
gz.Write([]byte(data))
gz.Close()
return buf.Len()
}

// buildCSV generates n rows of CSV data
func buildCSV(n int) string {
var sb strings.Builder
for i := 0; i < n; i++ {
fmt.Fprintf(&sb, "%d,User_%d,%d\n", 1000+i, i, 20+i%50)
}
return sb.String()
}

// buildJSON generates n rows of JSON Lines data
func buildJSON(n int) string {
var sb strings.Builder
for i := 0; i < n; i++ {
fmt.Fprintf(&sb, "{\"id\":%d,\"name\":\"User_%d\",\"age\":%d}\n", 1000+i, i, 20+i%50)
}
return sb.String()
}

func printResult(label string, data string) {
original := len(data)
compressed := gzipSize(data)
compressedBy := (1 - float64(compressed)/float64(original)) * 100
fmt.Printf(" %-40s original=%8d B after_gzip=%8d B compressed_by=%.1f%%\n",
label, original, compressed, compressedBy)
}

func main() {
sizes := []int{100, 1000, 10000, 100000, 1000000, 10000000}

fmt.Println("=== Gzip Compression Benchmark ===")
fmt.Println()

fmt.Println("[CSV format]")
for _, n := range sizes {
data := buildCSV(n)
printResult(fmt.Sprintf("%d rows (~%d KB)", n, len(data)/1024+1), data)
}

fmt.Println()
fmt.Println("[JSON format]")
for _, n := range sizes {
data := buildJSON(n)
printResult(fmt.Sprintf("%d rows (~%d KB)", n, len(data)/1024+1), data)
}
}
8 changes: 8 additions & 0 deletions sdk/go-doris-sdk/cmd/examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,23 @@ Available Examples:
concurrent - Production concurrent loading (1,000,000 records across 10 workers)
json - Production JSON data loading (50,000 JSON records)
basic - Basic concurrent loading demo (5 workers)
gzip - Gzip compressed CSV stream load demo
all - Run all examples sequentially

Examples:
go run cmd/examples/main.go single
go run cmd/examples/main.go concurrent
go run cmd/examples/main.go json
go run cmd/examples/main.go basic
go run cmd/examples/main.go gzip
go run cmd/examples/main.go all

Description:
single - Demonstrates single-threaded large batch loading with realistic product data
concurrent - Shows high-throughput concurrent loading with 10 workers processing order data
json - Illustrates JSON Lines format loading with structured user activity data
basic - Simple concurrent example for learning and development
gzip - Shows gzip-compressed CSV loading with automatic compression by the SDK
all - Runs all examples in sequence for comprehensive testing

For more details, see examples/README.md
Expand All @@ -75,6 +78,9 @@ func runExample(name string) {
case "basic":
fmt.Println("Running Basic Concurrent Example...")
examples.RunBasicConcurrentExample()
case "gzip":
fmt.Println("Running Gzip Compression Example...")
examples.GzipExample()
case "all":
fmt.Println("Running All Examples...")
fmt.Println("\n" + strings.Repeat("=", 80))
Expand All @@ -86,6 +92,8 @@ func runExample(name string) {
fmt.Println("\n" + strings.Repeat("=", 80))
examples.RunBasicConcurrentExample()
fmt.Println("\n" + strings.Repeat("=", 80))
examples.GzipExample()
fmt.Println("\n" + strings.Repeat("=", 80))
fmt.Println("All examples completed!")
default:
fmt.Printf("❌ Unknown example: %s\n\n", name)
Expand Down
64 changes: 64 additions & 0 deletions sdk/go-doris-sdk/examples/gzip_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package examples

import (
"fmt"
"strings"

doris "github.com/apache/doris/sdk/go-doris-sdk"
)

func GzipExample() {
config := &doris.Config{
Endpoints: []string{"http://10.16.10.6:48939"},
User: "root",
Password: "",
Database: "test",
Table: "student",
Format: doris.DefaultJSONFormat(),
Retry: doris.DefaultRetry(),
GroupCommit: doris.OFF,
EnableGzip: true,
}

client, err := doris.NewLoadClient(config)
if err != nil {
fmt.Printf("Failed to create client: %v\n", err)
return
}

jsonData := `{"id": 1001, "name": "Alice", "age": 20}
{"id": 1002, "name": "Bob", "age": 22}
{"id": 1003, "name": "Charlie", "age": 19}`

response, err := client.Load(strings.NewReader(jsonData))
if err != nil {
fmt.Printf("Load failed: %v\n", err)
return
}

fmt.Printf("Status: %s\n", response.Status)
if response.Status == doris.SUCCESS {
fmt.Printf("Loaded rows: %d\n", response.Resp.NumberLoadedRows)
fmt.Printf("Load bytes: %d\n", response.Resp.LoadBytes)
} else {
fmt.Printf("Message: %s\n", response.Resp.Message)
fmt.Printf("Error URL: %s\n", response.Resp.ErrorURL)
}
}
20 changes: 20 additions & 0 deletions sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,26 @@ func (c *DorisLoadClient) Load(reader io.Reader) (*loader.LoadResponse, error) {
}
}

// If gzip is enabled, compress the buffered data once before the retry loop.
// This avoids re-compressing on every retry attempt.
// The result is a *bytes.Reader so Go's HTTP client can replay it on 307 redirects.
if c.config.EnableGzip {
r, err := getBodyFunc()
if err != nil {
return nil, fmt.Errorf("failed to get reader for gzip: %w", err)
}
compressed, err := loader.GzipCompress(r)
if err != nil {
return nil, fmt.Errorf("failed to gzip compress: %w", err)
}
getBodyFunc = func() (io.Reader, error) {
if _, err := compressed.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("failed to seek compressed data: %w", err)
}
return compressed, nil
}
}

var lastErr error
var response *loader.LoadResponse
startTime := time.Now()
Expand Down
24 changes: 13 additions & 11 deletions sdk/go-doris-sdk/pkg/load/config/load_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ type Retry struct {

// Config contains all configuration for stream load operations
type Config struct {
Endpoints []string
User string
Password string
Database string
Table string
LabelPrefix string
Label string
Format Format // Can be &JSONFormat{...} or &CSVFormat{...}
Retry *Retry
GroupCommit GroupCommitMode
Options map[string]string
Endpoints []string
User string
Password string
Database string
Table string
LabelPrefix string
Label string
Format Format // Can be &JSONFormat{...} or &CSVFormat{...}
Retry *Retry
GroupCommit GroupCommitMode
EnableGzip bool // If true, the SDK compresses the request body with gzip and sets compress_type=gz
Options map[string]string
}

// ValidateInternal validates the configuration
Expand All @@ -141,6 +142,7 @@ func (c *Config) ValidateInternal() error {
return fmt.Errorf("format cannot be nil")
}


if c.Retry != nil {
if c.Retry.MaxRetryTimes < 0 {
return fmt.Errorf("maxRetryTimes cannot be negative")
Expand Down
27 changes: 27 additions & 0 deletions sdk/go-doris-sdk/pkg/load/loader/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package load

import (
"bytes"
"compress/gzip"
"encoding/base64"
"fmt"
"io"
Expand Down Expand Up @@ -54,6 +56,22 @@ func getNode(endpoints []string) (string, error) {
return endpointURL.Host, nil
}

// GzipCompress compresses r into memory and returns a *bytes.Reader of the compressed data.
// Using *bytes.Reader ensures Go's http.NewRequest automatically sets GetBody, which is
// required for the HTTP client to replay the body when Doris FE issues a 307 redirect to BE.
// Callers should call this once before the retry loop to avoid re-compressing on each attempt.
func GzipCompress(r io.Reader) (*bytes.Reader, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := io.Copy(gz, r); err != nil {
return nil, fmt.Errorf("gzip compress: %w", err)
}
if err := gz.Close(); err != nil {
return nil, fmt.Errorf("gzip close: %w", err)
}
return bytes.NewReader(buf.Bytes()), nil
}

// CreateStreamLoadRequest creates an HTTP PUT request for Doris stream load
func CreateStreamLoadRequest(cfg *config.Config, data io.Reader, attempt int) (*http.Request, error) {
// Get a random endpoint host
Expand Down Expand Up @@ -149,6 +167,15 @@ func buildStreamLoadOptions(cfg *config.Config) map[string]string {
// Don't add group_commit option
}

// Add compress_type header if gzip is enabled.
// Warn if user also set compress_type manually in Options to avoid silent conflicts.
if cfg.EnableGzip {
if _, exists := result["compress_type"]; exists {
log.Warnf("Both EnableGzip and Options[\"compress_type\"] are set; EnableGzip takes precedence, overriding to gz")
}
result["compress_type"] = "gz"
}

return result
}

Expand Down
Loading