Skip to content

Commit

Permalink
perf: speed up task loading
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Feb 1, 2024
1 parent 12db1d3 commit 879a33c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
5 changes: 1 addition & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ func load() chan *model.Task {

func main() {
numWorkers := opts.NumWorkers
numTasks := 0
for range load() {
numTasks++
}
numTasks := model.CountLines(opts.InputFilePath)
slog.Info("all tasks loaded", slog.Int("num_tasks", numTasks))
resultChans := make([]chan *model.Task, 0, numWorkers)
for _, taskChan := range model.FanOut(load(), numWorkers) {
Expand Down
23 changes: 23 additions & 0 deletions pkg/model/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"math/rand"
"os"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -75,6 +76,7 @@ func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int,
defer close(out)
index := 0
rng := rand.NewSource(seed)
slog.Info("loading tasks from file", slog.String("file", inputFilePath), slog.Int64("num_shards", numShards), slog.Int64("shard", shard))
for line := range ReadFile(inputFilePath) {
if rng.Int63()%numShards == shard {
task := factory(index, line, port, path, host, timeout, numRetries)
Expand All @@ -86,6 +88,27 @@ func LoadTasks[T ITask](factory TaskFactory[T], inputFilePath string, port int,
return out
}

func CountLines(path string) int {
fd, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil {
slog.Error("error occured while opening file", slog.String("error", err.Error()))
return 0
}
defer fd.Close()
scanner := bufio.NewScanner(fd)
count := 0
for scanner.Scan() {
if strings.TrimSpace(scanner.Text()) != "" {
count++
}
}
if err := scanner.Err(); err != nil {
slog.Error("error occured while scanning file", slog.String("error", err.Error()))
return 0
}
return count
}

func StoreTasks[T ITask](tasks chan T, outputFilePath string, statusUpdatesFilePath string, numTasks int) {
outputFd, err := os.OpenFile(outputFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
Expand Down

0 comments on commit 879a33c

Please sign in to comment.