Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix line protocol parser #1833

Merged
merged 2 commits into from
Jan 2, 2024
Merged

fix: fix line protocol parser #1833

merged 2 commits into from
Jan 2, 2024

Conversation

Kree0
Copy link
Contributor

@Kree0 Kree0 commented Dec 20, 2023

Required checklist

  • Sample config files updated (config,meta/config and default config)
  • If there are user-facing changes, the documentation needs to be updated prior to approving the PR( Link )
  • If there are any breaking changes to public APIs, please add the api change label.
  • Signed CLA (if not already signed)

Which issue does this PR close?

#1830
#1817

Rationale for this change

Are there any user-facing changes?

@Kree0 Kree0 force-pushed the fix/lp branch 2 times, most recently from 01b7c1e to 6182b94 Compare December 20, 2023 03:30
@zipper-meng
Copy link
Member

zipper-meng commented Dec 20, 2023

Performance

  • File: 10G line-protocol.

Old

Mode Cost Batch-Avg Cost Total
single_line 898ns 50s
multi_line 1000 865868ns 47s
multi_line 10000 9105261ns 49s

Now

Mode Cost Batch-Avg Cost Total
single_line 2358ns 112s
multi_line 1000 1978757ns 100s
multi_line 10000 20917411ns 107s

Bench code

Rust

use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::process::exit;
use std::time::Duration;

use protocol_parser::line_protocol::parser::Parser;

fn print_usage() {
    eprintln!(
        r#"Benchmark for line protocol parser.
Usage: benchmark single_line <file path>
    or benchmark multi_line <file path> <line num>
"#
    );
}

fn main() {
    let args = std::env::args().skip(1).collect::<Vec<String>>();
    if args.len() < 2 || args.len() > 3 {
        print_usage();
        exit(1);
    }
    let mode = args[0].as_str();
    eprintln!("- mode: {mode}");
    let path = PathBuf::from(&args[1]);
    eprintln!("- path: {}", path.display());

    let file = File::open(path).unwrap();
    let parser = Parser::new(1);
    let start_time = std::time::Instant::now();
    if mode == "single_line" {
        let avg = single_line_mode(file, &parser);
        println!("avg: {}ns", avg.as_nanos());
    } else if mode == "multi_line" {
        if args.len() != 3 {
            print_usage();
            exit(1);
        }
        let line_num: u32 = args[2].parse().unwrap();
        eprintln!("- line: {line_num}");
        let avg = multi_line_mode(file, &parser, line_num as usize);
        println!("avg: {}ns", avg.as_nanos());
    } else {
        print_usage();
        exit(1);
    }
    println!("total: {}s", start_time.elapsed().as_secs());
}

fn single_line_mode(file: File, parser: &Parser) -> Duration {
    let mut reader = BufReader::with_capacity(1024 * 1024 * 128, file);
    let mut line = String::with_capacity(1024);
    let mut avg = 0_u128;
    let mut avg_n = 0_usize;
    loop {
        match reader.read_line(&mut line) {
            Ok(n) => {
                if n == 0 {
                    break;
                }
                let start_time = std::time::Instant::now();
                match parser.parse(&line) {
                    Ok(_l) => {
                        avg = avg + start_time.elapsed().as_nanos();
                        avg_n += 1;
                        if avg_n == 10000 {
                            avg = avg / 10000;
                            avg_n = 0;
                        }
                    }
                    Err(e) => panic!("Parse error: {e}"),
                }

                line.clear();
            }
            Err(e) => panic!("Read file error: {e}"),
        }
    }
    Duration::from_nanos((avg / avg_n as u128) as u64)
}

fn multi_line_mode(file: File, parser: &Parser, line_num: usize) -> Duration {
    let mut reader = BufReader::with_capacity(1024 * 1024 * 16, file);
    let mut buf = String::with_capacity(1024 * 1024);
    let mut ln = 0_usize;
    let mut avg = 0_u128;
    let mut avg_n = 0_usize;
    loop {
        match reader.read_line(&mut buf) {
            Ok(n) => {
                if n == 0 {
                    let start_time = std::time::Instant::now();
                    match parser.parse(&buf) {
                        Ok(_l) => {
                            avg = avg + start_time.elapsed().as_nanos();
                            avg_n += 1;
                            avg = avg / avg_n as u128;
                        }
                        Err(e) => panic!("Parse error: {e}"),
                    }
                    break;
                }

                if ln < line_num {
                    ln += 1;
                    continue;
                }

                ln = 0;
                let start_time = std::time::Instant::now();
                match parser.parse(&buf) {
                    Ok(_l) => {
                        avg = avg + start_time.elapsed().as_nanos();
                        avg_n += 1;
                        if avg_n == 10000 {
                            avg = avg / 10000;
                            avg_n = 0;
                        }
                    }
                    Err(e) => panic!("Parse error: {e}"),
                }
                buf.clear();
            }
            Err(e) => panic!("Read file error: {e}"),
        }
    }
    Duration::from_nanos(avg as u64)
}

Script

Single line mode

cargo run --release -- single_line seed-123_scale-100_20240101-20240104_interval-1s

Multi line mode

cargo run --release -- multi_line seed-123_scale-100_20240101-20240104_interval-1s 1000

roseboy-liu
roseboy-liu previously approved these changes Dec 29, 2023
@roseboy-liu roseboy-liu merged commit d6619a0 into cnosdb:main Jan 2, 2024
7 checks passed
@Kree0 Kree0 deleted the fix/lp branch January 2, 2024 09:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants