From e1fb378acce53d8c3035ee4813ae377aaf51aa3c Mon Sep 17 00:00:00 2001 From: Eugene Huang <1035393+elh@users.noreply.github.com> Date: Sat, 3 Feb 2024 04:37:28 +0800 Subject: [PATCH] Add elh's Go solution (#435) * add elh's Go solution * update elh. fix a bad for loop and add some tuning env vars --- calculate_average_elh.sh | 18 +++ github_users.txt | 1 + prepare_elh.sh | 18 +++ src/main/go/elh/Dockerfile | 24 +++ src/main/go/elh/go.mod | 3 + src/main/go/elh/main.go | 301 +++++++++++++++++++++++++++++++++++++ 6 files changed, 365 insertions(+) create mode 100755 calculate_average_elh.sh create mode 100755 prepare_elh.sh create mode 100644 src/main/go/elh/Dockerfile create mode 100644 src/main/go/elh/go.mod create mode 100644 src/main/go/elh/main.go diff --git a/calculate_average_elh.sh b/calculate_average_elh.sh new file mode 100755 index 000000000..b37cdddcc --- /dev/null +++ b/calculate_average_elh.sh @@ -0,0 +1,18 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +target/elh/1brc-go diff --git a/github_users.txt b/github_users.txt index eb3ac2ca1..cb1d35e03 100644 --- a/github_users.txt +++ b/github_users.txt @@ -55,3 +55,4 @@ jincongho;Jin Cong Ho yonatang;Yonatan Graber adriacabeza;AdriĆ  Cabeza AlexanderYastrebov;Alexander Yastrebov +elh;Eugene Huang diff --git a/prepare_elh.sh b/prepare_elh.sh new file mode 100755 index 000000000..b4cb4f62c --- /dev/null +++ b/prepare_elh.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +DOCKER_BUILDKIT=1 docker build -o target/elh src/main/go/elh diff --git a/src/main/go/elh/Dockerfile b/src/main/go/elh/Dockerfile new file mode 100644 index 000000000..920570f9e --- /dev/null +++ b/src/main/go/elh/Dockerfile @@ -0,0 +1,24 @@ +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM golang AS builder +WORKDIR /app +COPY . ./ +RUN go build -ldflags "-w -s" -o /1brc-go . + +FROM scratch AS runner +WORKDIR / +COPY --from=builder /1brc-go / diff --git a/src/main/go/elh/go.mod b/src/main/go/elh/go.mod new file mode 100644 index 000000000..e05dc76ad --- /dev/null +++ b/src/main/go/elh/go.mod @@ -0,0 +1,3 @@ +module github.com/elh/1brc-go + +go 1.21.5 diff --git a/src/main/go/elh/main.go b/src/main/go/elh/main.go new file mode 100644 index 000000000..4f42d38df --- /dev/null +++ b/src/main/go/elh/main.go @@ -0,0 +1,301 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "log" + "math" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "sort" + "strconv" + "strings" + "sync" + "time" + "unsafe" +) + +// go run main.go [measurements_file] +// tune env vars for performance +// +// Environment variables: +// - NUM_PARSERS: number of parsers to run concurrently. if unset, defaults +// to runtime.NumCPU() +// - PARSE_CHUNK_SIZE_MB: size of each chunk to parse. if unset, defaults to +// defaultParseChunkSize +// - PROFILE: if "true", enables profiling + +var ( + // others: "heap", "threadcreate", "block", "mutex" + profileTypes = []string{"goroutine", "allocs"} +) + +const ( + defaultMeasurementsPath = "measurements.txt" + maxNameLen = 100 + maxNameNum = 10000 + + // tuned for a 2023 Macbook M2 Pro + defaultParseChunkSizeMB = 64 + mb = 1024 * 1024 // bytes +) + +type Stats struct { + Min, Max, Sum float64 + Count int +} + +// rounding floats to 1 decimal place with 0.05 rounding up to 0.1 +func round(x float64) float64 { + return math.Floor((x+0.05)*10) / 10 +} + +// parseFloatFast is a high performance float parser using the assumption that +// the byte slice will always have a single decimal digit. +func parseFloatFast(bs []byte) float64 { + var intStartIdx int // is negative? + if bs[0] == '-' { + intStartIdx = 1 + } + + v := float64(bs[len(bs)-1]-'0') / 10 // single decimal digit + place := 1.0 + for i := len(bs) - 3; i >= intStartIdx; i-- { // integer part + v += float64(bs[i]-'0') * place + place *= 10 + } + + if intStartIdx == 1 { + v *= -1 + } + return v +} + +// size is the intended number of bytes to parse. buffer should be longer than size +// because we need to continue reading until the end of the line in order to +// properly segment the entire file and not miss any data. +func parseAt(f *os.File, buf []byte, offset int64, size int) map[string]*Stats { + stats := make(map[string]*Stats, maxNameNum) + n, err := f.ReadAt(buf, offset) // load the buffer + if err != nil && err != io.EOF { + log.Fatal(err) + } + + lastName := make([]byte, maxNameLen) // last name parsed + var lastNameLen int + isScanningName := true // currently scanning name or value? + + // if offset is non-zero, skip to the first new line + var idx, start int + if offset != 0 { + for idx < n { + if buf[idx] == '\n' { + idx++ + start = idx + break + } + idx++ + } + } + // tick tock between parsing names and values while accummulating stats + for { + if isScanningName { + for idx < n { + if buf[idx] == ';' { + nameBs := buf[start:idx] + lastNameLen = copy(lastName, nameBs) + + idx++ + start = idx + isScanningName = false + break + } + idx++ + } + } else { + for idx < n { + if buf[idx] == '\n' { + valueBs := buf[start:idx] + value := parseFloatFast(valueBs) + + nameUnsafe := unsafe.String(&lastName[0], lastNameLen) + if s, ok := stats[nameUnsafe]; !ok { + name := string(lastName[:lastNameLen]) // actually allocate string + stats[name] = &Stats{Min: value, Max: value, Sum: value, Count: 1} + } else { + if value < s.Min { + s.Min = value + } + if value > s.Max { + s.Max = value + } + s.Sum += value + s.Count++ + } + + idx++ + start = idx + isScanningName = true + break + } + idx++ + } + } + // terminate when we hit the first newline after the intended size OR + // when we hit the end of the file + if (isScanningName && idx >= size) || idx >= n { + break + } + } + + return stats +} + +func printResults(stats map[string]*Stats) { // doesn't help + // sorted alphabetically for output + names := make([]string, 0, len(stats)) + for name := range stats { + names = append(names, name) + } + sort.Strings(names) + + var builder strings.Builder + for i, name := range names { + s := stats[name] + // gotcha: first round the sum to to remove float precision errors! + avg := round(round(s.Sum) / float64(s.Count)) + builder.WriteString(fmt.Sprintf("%s=%.1f/%.1f/%.1f", name, s.Min, avg, s.Max)) + if i < len(names)-1 { + builder.WriteString(", ") + } + } + + writer := bufio.NewWriter(os.Stdout) + fmt.Fprintf(writer, "{%s}\n", builder.String()) + writer.Flush() +} + +// Read file in chunks and parse concurrently. N parsers work off of a chunk +// offset chan and send results on an output chan. The results are merged into a +// single map of stats and printed. +func main() { + // parse env vars and inputs + shouldProfile := os.Getenv("PROFILE") == "true" + var err error + var numParsers int + { + if os.Getenv("NUM_PARSERS") != "" { + numParsers, err = strconv.Atoi(os.Getenv("NUM_PARSERS")) + if err != nil { + log.Fatal(fmt.Errorf("failed to parse NUM_PARSERS: %w", err)) + } + } else { + numParsers = runtime.NumCPU() + } + } + var parseChunkSize int + { + if os.Getenv("PARSE_CHUNK_SIZE_MB") != "" { + parseChunkSizeMB, err := strconv.Atoi(os.Getenv("PARSE_CHUNK_SIZE_MB")) + if err != nil { + log.Fatal(fmt.Errorf("failed to parse PARSE_CHUNK_SIZE_MB: %w", err)) + } + parseChunkSize = parseChunkSizeMB * mb + } else { + parseChunkSize = defaultParseChunkSizeMB * mb + } + } + + measurementsPath := defaultMeasurementsPath + if len(os.Args) > 1 { + measurementsPath = os.Args[1] + } + + // profile code + if shouldProfile { + nowUnix := time.Now().Unix() + os.MkdirAll(fmt.Sprintf("profiles/%d", nowUnix), 0755) + for _, profileType := range profileTypes { + file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.%s.pprof", + nowUnix, filepath.Base(measurementsPath), profileType)) + defer file.Close() + defer pprof.Lookup(profileType).WriteTo(file, 0) + } + + file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.cpu.pprof", + nowUnix, filepath.Base(measurementsPath))) + defer file.Close() + pprof.StartCPUProfile(file) + defer pprof.StopCPUProfile() + } + + // read file + f, err := os.Open(measurementsPath) + if err != nil { + log.Fatal(fmt.Errorf("failed to open %s file: %w", measurementsPath, err)) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + log.Fatal(fmt.Errorf("failed to read %s file: %w", measurementsPath, err)) + } + + // kick off "parser" workers + wg := sync.WaitGroup{} + wg.Add(numParsers) + + // buffered to not block on merging + chunkOffsetCh := make(chan int64, numParsers) + chunkStatsCh := make(chan map[string]*Stats, numParsers) + + go func() { + i := 0 + for i < int(info.Size()) { + chunkOffsetCh <- int64(i) + i += parseChunkSize + } + close(chunkOffsetCh) + }() + + for i := 0; i < numParsers; i++ { + // WARN: w/ extra padding for line overflow. Each chunk should be read past + // the intended size to the next new line. 128 bytes should be enough for + // a max 100 byte name + the float value. + buf := make([]byte, parseChunkSize+128) + go func() { + for chunkOffset := range chunkOffsetCh { + chunkStatsCh <- parseAt(f, buf, chunkOffset, parseChunkSize) + } + wg.Done() + }() + } + + go func() { + wg.Wait() + close(chunkStatsCh) + }() + + mergedStats := make(map[string]*Stats, maxNameNum) + for chunkStats := range chunkStatsCh { + for name, s := range chunkStats { + if ms, ok := mergedStats[name]; !ok { + mergedStats[name] = s + } else { + if s.Min < ms.Min { + ms.Min = s.Min + } + if s.Max > ms.Max { + ms.Max = s.Max + } + ms.Sum += s.Sum + ms.Count += s.Count + } + } + } + + printResults(mergedStats) +}