From 879a33cec7bfa6b946990a93a2a3d39a3af114d3 Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Thu, 1 Feb 2024 15:54:41 +0800 Subject: [PATCH] perf: speed up task loading --- main.go | 5 +---- pkg/model/scheduler.go | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index b4c300f..cb8eaaa 100644 --- a/main.go +++ b/main.go @@ -54,10 +54,7 @@ func load() chan *model.Task { func main() { numWorkers := opts.NumWorkers - numTasks := 0 - for range load() { - numTasks++ - } + numTasks := model.CountLines(opts.InputFilePath) slog.Info("all tasks loaded", slog.Int("num_tasks", numTasks)) resultChans := make([]chan *model.Task, 0, numWorkers) for _, taskChan := range model.FanOut(load(), numWorkers) { diff --git a/pkg/model/scheduler.go b/pkg/model/scheduler.go index d8e0dbd..387f208 100644 --- a/pkg/model/scheduler.go +++ b/pkg/model/scheduler.go @@ -6,6 +6,7 @@ import ( "log/slog" "math/rand" "os" + "strings" "sync" "time" ) @@ -75,6 +76,7 @@ func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int, defer close(out) index := 0 rng := rand.NewSource(seed) + slog.Info("loading tasks from file", slog.String("file", inputFilePath), slog.Int64("num_shards", numShards), slog.Int64("shard", shard)) for line := range ReadFile(inputFilePath) { if rng.Int63()%numShards == shard { task := factory(index, line, port, path, host, timeout, numRetries) @@ -86,6 +88,27 @@ func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int, return out } +func CountLines(path string) int { + fd, err := os.OpenFile(path, os.O_RDONLY, 0644) + if err != nil { + slog.Error("error occured while opening file", slog.String("error", err.Error())) + return 0 + } + defer fd.Close() + scanner := bufio.NewScanner(fd) + count := 0 + for scanner.Scan() { + if strings.TrimSpace(scanner.Text()) != "" { + count++ + } + } + if err := scanner.Err(); err != nil { + slog.Error("error occured while scanning file", slog.String("error", err.Error())) + return 0 + } + return count +} + func StoreTasks[T ITask](tasks chan T, outputFilePath string, statusUpdatesFilePath string, numTasks int) { outputFd, err := os.OpenFile(outputFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil {