Create transactions archive using Clickhouse as data source#80
Create transactions archive using Clickhouse as data source#80
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for creating transaction archives using ClickHouse as a data source, providing an alternative to local CSV files for better infrastructure redundancy. The implementation includes a new ClickHouse client that can load transaction data directly from the database within specified date ranges.
- Adds ClickHouse connectivity and data loading capabilities for transaction archives
- Temporarily disables source analysis features due to data size considerations
- Introduces date parsing utilities to support flexible date input formats
Reviewed Changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| cmd/merge/clickhouse.go | New ClickHouse client implementation for loading transaction data |
| cmd/merge/main.go | Adds CLI flags for ClickHouse DSN and date range parameters |
| cmd/merge/transactions.go | Integrates ClickHouse data loading with existing transaction processing logic |
| common/utils.go | Adds flexible date string parsing utility function |
| common/utils_test.go | Test coverage for the new date parsing functionality |
| common/analyzer.go | Comments out source analysis and latency comparison features |
| go.mod | Removes unused HdrHistogram dependency |
| scripts/upload.sh | Comments out transaction blacklist parameter |
| GROUP BY (hash, chain_id, tx_type, from, to, value, nonce, gas, gas_price, gas_tip_cap, gas_fee_cap, data_size, data_4bytes) | ||
| SETTINGS max_threads = 8, | ||
| max_block_size = 65536, | ||
| group_by_two_level_threshold = 100000`, timeStart, timeEnd) |
There was a problem hiding this comment.
The SQL query uses magic numbers for SETTINGS (max_threads = 8, max_block_size = 65536, group_by_two_level_threshold = 100000). Consider defining these as named constants to improve maintainability and make their purpose clearer.
| group_by_two_level_threshold = 100000`, timeStart, timeEnd) | |
| query := fmt.Sprintf(`SELECT | |
| min(received_at), hash, chain_id, tx_type, from, to, value, nonce, gas, gas_price, gas_tip_cap, gas_fee_cap, data_size, data_4bytes, any(raw_tx) | |
| FROM transactions WHERE received_at >= ? AND received_at < ? | |
| GROUP BY (hash, chain_id, tx_type, from, to, value, nonce, gas, gas_price, gas_tip_cap, gas_fee_cap, data_size, data_4bytes) | |
| SETTINGS max_threads = %d, | |
| max_block_size = %d, | |
| group_by_two_level_threshold = %d`, clickhouseMaxThreads, clickhouseMaxBlockSize, clickhouseGroupByTwoLevelThreshold) | |
| rows, err := ch.conn.Query(ctx, query, timeStart, timeEnd) |
There was a problem hiding this comment.
good point, they are suggested by ChatGPT and seemed to improve performance. leaving them as-is for now.
| err := ch.connect() | ||
| if err != nil { |
There was a problem hiding this comment.
stylistic optional nit:
| err := ch.connect() | |
| if err != nil { | |
| if err := ch.connect(); err != nil { |
| if len(inputFiles) > 0 { | ||
| // Check input files | ||
| for _, fn := range append(inputFiles, sourcelogFiles...) { | ||
| common.MustBeCSVFile(log, fn) | ||
| } | ||
|
|
||
| // | ||
| // Load input files | ||
| // | ||
| txs, err := common.LoadTransactionCSVFiles(log, inputFiles, txBlacklistFiles) | ||
| if err != nil { | ||
| return fmt.Errorf("LoadTransactionCSVFiles: %w", err) | ||
| } | ||
| // Load input files | ||
| txs, sourcelog, err = loadInputFiles(inputFiles, sourcelogFiles, txBlacklistFiles) | ||
| if err != nil { | ||
| return fmt.Errorf("loadInputFiles: %w", err) | ||
| } | ||
| } else { | ||
| dateFrom, err := common.ParseDateString(dateFrom) | ||
| if err != nil { | ||
| return fmt.Errorf("ParseDateString dateFrom: %w", err) | ||
| } | ||
| dateTo, err := common.ParseDateString(dateTo) | ||
| if err != nil { | ||
| return fmt.Errorf("ParseDateString dateTo: %w", err) | ||
| } | ||
| log.Infow("Using Clickhouse data source", "dateFrom", dateFrom.String(), "dateTo", dateTo.String()) | ||
|
|
||
| log.Infow("Processed all input tx files", "txTotal", printer.Sprintf("%d", len(txs)), "memUsed", common.GetMemUsageHuman()) | ||
| // return fmt.Errorf("loading from Clickhouse not implemented yet") //nolint:err113 | ||
| txs = loadDataFromClickhouse(clickhouseDSN, dateFrom, dateTo) | ||
| sourcelog = make(map[string]map[string]int64) // empty sourcelog | ||
| } |
There was a problem hiding this comment.
The logic/checks in if branches seem different enough, maybe worth moving into functions for readability? For example, common.MustBeCSVFile to loadInputFiles, and parsing of dates into loadDataFromClickhouse
| if len(checkNodeURIs) == 0 { | ||
| log.Info("No check-node specified, skipping inclusion status update") | ||
| } else { | ||
| err = updateInclusionStatus(log, checkNodeURIs, txs) |
There was a problem hiding this comment.
optional: Would it be easier to move this check into updateInclusionStatus and make it valid to call it with empty checkNodeURIs? That way we'll keep caller simpler.
| // Load sourcelog files | ||
| // | ||
| log.Infow("Loading sourcelog files...", "files", sourcelogFiles) | ||
| sourcelog, _ = common.LoadSourcelogFiles(log, sourcelogFiles) |
There was a problem hiding this comment.
that's right, it can't, at least in the code that it was
| DSN: clickhouseDSN, | ||
| }) | ||
| if err != nil { | ||
| log.Fatalw("failed to connect to Clickhouse", "error", err) |
There was a problem hiding this comment.
Would be more uniform to return err, as in loadInputFiles and log.Fatal in caller
There was a problem hiding this comment.
since this is the top-level script imo it's okay to keep quick'n dirty for now
| log.Fatalw("failed to connect to Clickhouse", "error", err) | ||
| } | ||
|
|
||
| loadSecPerMin := 0.45 // estimated load speed in seconds per minute of data, based on previous runs |
There was a problem hiding this comment.
This seems to be heavily dependent on a lot of things, maybe not worth hardcoding?
There was a problem hiding this comment.
it definitely is, lol
| // out += fmt.Sprintf("Sources: %s \n", strings.Join(TitleStrings(a.sources), ", ")) | ||
| // out += fmt.Sprintln("") |
|
|
||
| var ( | ||
| ErrUnsupportedFileFormat = errors.New("unsupported file format") | ||
| ErrUnableToParseDate = errors.New("unable to parse date") |
There was a problem hiding this comment.
nit: Would ErrInvalidDate be more succinct name?
| --write-summary \ | ||
| --check-node /mnt/data/geth/geth.ipc \ | ||
| --tx-blacklist "$1/../${yesterday}/${yesterday}.csv.zip" \ | ||
| # --tx-blacklist "$1/../${yesterday}/${yesterday}.csv.zip" \ |
There was a problem hiding this comment.
reverted to as-was for now
📝 Summary
This PR enables creation of archives using Clickhouse as data source.
The transactions alone need to load already 10+ GB of data from Clickhouse, and doesn't include the sources anymore. This is an acceptable tradeoff, versus using a join on the data and potentially 3x/4x the amount of data needed to be loaded for adding the sources too. A future PR might address this if this is a requested feature.
⛱ Motivation and Context
Mempool Dumpster added support for storing data in Clickhouse, which allows for a better, redundant infrastructure setup (instead of relying on local CSV files).