From 12db1d3f08abce814331feffc8a39abed23e59ee Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Thu, 1 Feb 2024 14:48:51 +0800 Subject: [PATCH] feat: support task sharding --- README.md | 21 +++++++++++++-------- main.go | 20 ++++++++++++++------ pkg/model/scheduler.go | 12 ++++++++---- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 16aa467..b66a616 100644 --- a/README.md +++ b/README.md @@ -18,16 +18,21 @@ Usage: http-grab [OPTIONS] Application Options: - -i, --input= input file path - -o, --output= output file path - -n, --num-workers= number of workers (default: 32) - -t, --timeout= timeout (default: 8) - -p, --port= port (default: 80) - -P, --path= path (default: index.html) - -H, --host= host + -i, --input= input file path + -o, --output= output file path + -s, --status-updates= status updates file path + -n, --num-workers= number of workers (default: 32) + --seed= seed (default: 0) + --num-shards= number of shards (default: 1) + --shard= shard (default: 0) + -p, --port= port (default: 80) + --path= path (default: index.html) + --host= http host header + -m, --max-tries= max tries (default: 4) + -t, --timeout= timeout (default: 8) Help Options: - -h, --help Show this help message + -h, --help Show this help message ``` ```bash diff --git a/main.go b/main.go index 35bc66f..b4c300f 100644 --- a/main.go +++ b/main.go @@ -12,12 +12,17 @@ type Options struct { InputFilePath string `short:"i" long:"input" description:"input file path" required:"true"` OutputFilePath string `short:"o" long:"output" description:"output file path" required:"true"` StatusUpdatesFilePath string `short:"s" long:"status-updates" description:"status updates file path"` - NumWorkers int `short:"n" long:"num-workers" description:"number of workers" default:"32"` - Timeout int `short:"t" long:"timeout" description:"timeout" default:"8"` - Port int `short:"p" long:"port" description:"port" default:"80"` - Path string `short:"P" long:"path" description:"path" default:"index.html"` - Host string `short:"H" long:"host" description:"host" default:""` - MaxTries int `short:"m" long:"max-tries" description:"max tries" default:"4"` + + NumWorkers int `short:"n" long:"num-workers" description:"number of workers" default:"32"` + Seed int64 `long:"seed" description:"seed" default:"0"` + NumShards int64 `long:"num-shards" description:"number of shards" default:"1"` + Shard int64 `long:"shard" description:"shard" default:"0"` + + Port int `short:"p" long:"port" description:"port" default:"80"` + Path string `long:"path" description:"path" default:"index.html"` + Host string `long:"host" description:"http host header" default:""` + MaxTries int `short:"m" long:"max-tries" description:"max tries" default:"4"` + Timeout int `short:"t" long:"timeout" description:"timeout" default:"8"` } var opts Options @@ -41,6 +46,9 @@ func load() chan *model.Task { opts.Host, opts.Timeout, opts.MaxTries, + opts.Seed, + opts.NumShards, + opts.Shard, ) } diff --git a/pkg/model/scheduler.go b/pkg/model/scheduler.go index 23d98a6..d8e0dbd 100644 --- a/pkg/model/scheduler.go +++ b/pkg/model/scheduler.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "log/slog" + "math/rand" "os" "sync" "time" @@ -68,15 +69,18 @@ type ITask interface { type TaskFactory[T ITask] func(index int, ip string, port int, path string, host string, timeout int, numRetries int) T -func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int, path string, host string, timeout int, numRetries int) chan T { +func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int, path string, host string, timeout int, numRetries int, seed int64, numShards int64, shard int64) chan T { out := make(chan T) go func() { defer close(out) index := 0 + rng := rand.NewSource(seed) for line := range ReadFile(inputFilePath) { - task := factory(index, line, port, path, host, timeout, numRetries) - out <- task - index++ + if rng.Int63()%numShards == shard { + task := factory(index, line, port, path, host, timeout, numRetries) + out <- task + index++ + } } }() return out