Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk inserts are tricky to do efficiently #59

Open
Diggsey opened this issue Sep 6, 2016 · 16 comments
Open

Bulk inserts are tricky to do efficiently #59

Diggsey opened this issue Sep 6, 2016 · 16 comments

Comments

@Diggsey
Copy link
Contributor

Diggsey commented Sep 6, 2016

I had to implement some kind of batching myself to improve performance. This involved building up a query string over time, something like INSERT INTO table (fields...) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?), ...

The query builder is not particularly well suited to this, and building up the query via string concatenation seems relatively inefficient. Also I could only support positional parameters, since the name parsing code is not exposed (which means I couldn't use Params::make_positional).

I don't know if there's a more efficient way to do this at the protocol level?

At the very least I think one or more of the following would be useful:

  • Provide some guidelines in the documentation for how to do this
  • Expose the named parameter parsing functionality to allow building up the batching functionality oneself
  • Add some new functionality to assist with this usecase
@blackbeam
Copy link
Owner

blackbeam commented Sep 9, 2016

  1. Most efficient way, i think, is using LOAD DATA INFILE if applicable.

  2. The Second option is a large handcrafted statement or query:

    • For handcrafted query (:warning: will strongly not recommend) you should be aware of the max_allowed_packet value.
    • For a handcrafted statement, you also should be aware of maximum allowed parameters. It may look like this (in pseudo-rust and without checks)
     fn bulk_insert<F, P>(
        pool: &crate::Pool,
        table: String,
        cols: Vec<String>,
        objects: Vec<Obj>,
        fun: F,
    ) -> crate::Result<()>
    where
        F: Fn(&Obj) -> P,
        P: Into<Params>,
    {
        let mut stmt = format!("INSERT INTO {} ({}) VALUES ", table, cols.join(","));
        let row = format!(
            "({}),",
            cols.iter()
                .map(|_| "?".to_string())
                .collect::<Vec<_>>()
                .join(",")
        );
        stmt.reserve(objects.len() * (cols.len() * 2 + 2));
        for _ in 0..objects.len() {
            stmt.push_str(&row);
        }
    
        // remove the trailing comma
        stmt.pop();
    
        let mut params = Vec::new();
        for o in objects.iter() {
            let named_params: crate::Params = fun(o).into();
            let positional_params = named_params.into_positional(&cols)?;
            if let crate::Params::Positional(new_params) = positional_params {
                for param in new_params {
                    params.push(param);
                }
            }
        }
    
        let mut conn = pool.get_conn()?;
        conn.exec_drop(stmt, params)
    }
    // ...
    bulk_insert(pool, "table", vec!["value1", "value2"], objects, |object| {
        params! {
            "value1" => object.value1,
            "value2" => object.value2,
        }
    });
  3. Less efficient but most convenient option is to execute one statement per row. It looks like you can speed up this process by setting autocommit to 0 (:warning: run SET autocommit=0 and all of INSERT queries on specific connection by taking it from the pool), but do not forget to set autocommit back to 1. It may look like this (in pseudorust):

    fn bulk_insert<F, P>(pool: my::Pool, stmt: &str, objs: Vec<Object>, fun: F) -> my::Result<()>
    where F: Fn(&Object) -> P,
          P: Into<my::Params>,
    {
        let mut conn = try!(pool.get_conn());
        try!(conn.query("SET autocommit=0"));
        {
            let mut stmt = try!(pool.prepare(stmt));
            for obj in objs.iter() {
                try!(stmt.execute(fun(obj)));
            }
        }
        try!(conn.query("COMMIT"));
        try!(conn.query("SET autocommit=1"));
    }
    // ...
    bulk_insert(pool, "INSERT INTO table (value1, value2) VALUES (:value1, :value2)", objs, |obj| {
        params! {
            "value1" => obj.value1,
            "value2" => obj.value2,
        }
    });

@Diggsey
Copy link
Contributor Author

Diggsey commented Sep 9, 2016

@blackbeam Thanks for the detailed reply:

  1. is impossible since I'm using google's Cloud SQL database, however it did lead me to find LOAD DATA LOCAL INFILE which does work.

It looks like at the protocol level, the MySQL server receives the query with the filename, then sends back a request to the client asking for the contents of that file, which the client then sends to the server. Based on this it seems the client could actually send data from memory rather than a file, which opens up a lot of possibilities.

  1. this is essentially what I've resorted to doing at the moment, but as you say I'm at the mercy of the max_allowed_packet and other arbitrary limits.

  2. I'm already in a transaction so auto-commit is off anyway. The problem is the network latency to the server, because even a few ms RTT results in long delays for 10,000 rows.

@Diggsey
Copy link
Contributor Author

Diggsey commented Sep 9, 2016

For reference, I'm inserting ~2 million rows into one table with fairly large rows, and ~10 million rows into another table which is much more lightweight.

Currently it takes around ~2 hours to insert all the data, even using a batch size of 512. (yep, discovered that utf-8 bug ~30 mins into the import 😒 ) and despite the large quantity of data I don't think it should take that long.

@0xpr03
Copy link
Contributor

0xpr03 commented Sep 1, 2018

@Diggsey have you tried things like LOCK TABLES `mytable` WRITE;
and then done insert into.. and afterwards UNLOCK TABLES; (using the same connection).
This could also be a bottleneck and is the way mysql dump works.

@Diggsey
Copy link
Contributor Author

Diggsey commented Sep 1, 2018

@0xpr03 I solved this using LOAD DATA FROM LOCAL INFILE and then installing a custom handler which streamed the data directly from memory.

@hpca01
Copy link
Contributor

hpca01 commented Mar 2, 2020

@0xpr03 I solved this using LOAD DATA FROM LOCAL INFILE and then installing a custom handler which streamed the data directly from memory.

Sorry, do you mind showing the example of your LocalInfileHandler that you implemented? I am lost as to what traits the handler should be implementing.

@Diggsey
Copy link
Contributor Author

Diggsey commented Mar 2, 2020

@hpca01 there's an example in the documentation: https://docs.rs/mysql/18.0.0/mysql/struct.LocalInfileHandler.html

@hpca01
Copy link
Contributor

hpca01 commented Mar 2, 2020

@Diggsey I apologize as I am still new to the language, I got the whole thing to work by doing the following:

val.conn
	.set_local_infile_handler(Some(LocalInfileHandler::new(move |_, stream| {
		for a_val in input.as_ref() {
			write!(stream, "{}", a_val).unwrap();
		}
		Ok(())
	})));
match val
	.conn
	.query("LOAD DATA LOCAL INFILE 'file' INTO TABLE schema.price_type_pricelineitem (price_type, price_amount, per_unit, linked_ndc_id, price_internal_id)")
{
	Ok(_) => {}
	Err(err) => println!("Error {}", err),
}

I wanted to re-write it to exclude the closure and have a concrete struct that implements whatever trait is required to make it easier for the next person.

@Diggsey
Copy link
Contributor Author

Diggsey commented Mar 2, 2020

The trait that's required is FnMut. This is implemented automatically by closures, but implementing FnMut for your own types is still unstable.

I'm not really sure what you hope to make easier by moving it to a struct, but LocalInfineHandler is just a struct, so if you want to encapsulate creating that struct you can just make a function that returns it and hides the fact that you're supplying a closure...

@hpca01
Copy link
Contributor

hpca01 commented Mar 2, 2020

@Diggsey thanks so much for the clear explanation 😄

I will just leave it the way it is, and write some detailed comments instead. Thanks again for your patience, and your work on the crate.

@orangesoup
Copy link

Hey @blackbeam! I have such use case where I need to insert (or update) thousands of rows per second, so my only real option (as far as I know) is your 2nd one, handcrafting the query with ON DUPLICATE KEY UPDATE ....

I've tried to use your pseudorust code with the latest 20.0.1 version, however, I get an error:

conn.exec_drop(stmt, params)
     ^^^^^^^^^ the trait `std::convert::From<mysql::Params>` is not implemented for `mysql::Value`

For objects I've used VecDeque<SomeStruct>, otherwise haven't really changed anything, besides using exec_drop instead of prep_exec.

Is there a way to fix my issue? Will there be a "native" option in the library to do these kind of operations easily later?

@blackbeam
Copy link
Owner

@orangesoup, hi. params should be a Vec of mysql::Value. I've updated the code snipped in #59 (comment)

@orangesoup
Copy link

Thank, will check!

@midnightcodr
Copy link

midnightcodr commented Apr 6, 2021

Thanks @blackbeam for sharing the code snippet (the second option). I found a tiny issue with the code, stmt would have a dangling comma at the end and trigger a mysql error when exec_drop() is run. To fix that, simply do a

stmt.pop();

right before line

let mut params = Vec::new();

Other than that the bulk insert code works quite nicely.

@blackbeam
Copy link
Owner

@midnightcodr, I've updated the snippet. Thanks.

@midnightcodr
Copy link

@blackbeam 2nd option's bulk_insert function signature can be improved even further by making the Obj type generic, that is

pub fn bulk_insert<F, P, T>(
    pool: &crate::Pool,
    table: String,
    cols: Vec<String>,
    objects: Vec<T>,
    fun: F,
) -> crate::Result<()>
where
    F: Fn(&T) -> P,
    P: Into<Params>,
...

By making such update, bulk_insert is now much more powerful so that I don't have to write a new bulk_insert function (under a new name of course) if I need to bulk insert for a type that's not Obj, for example. I can confirm through testing that making Object generic is working quite well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants