Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Basic implementation of :streaming #223

Merged
merged 13 commits into from

7 participants

Ben McRedmond Brian Lopez Jeshua Borges Ricardo Amorim Julio Capote Seamus Abshere Dmitriy Kiriyenko
Ben McRedmond

A simple implementation of :streaming using mysql_use_result rather than mysql_store_result. Been working pretty well for me so far.

ext/mysql2/result.c
((17 lines not shown))
}
- if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
- // we've already read the entire dataset from the C result into our
- // internal array. Lets hand that over to the user since it's ready to go
- for (i = 0; i < wrapper->numberOfRows; i++) {
- rb_yield(rb_ary_entry(wrapper->rows, i));
+ if (streaming) {
+ if(!wrapper->streamingComplete) {
+ VALUE row;
+
+ do {
+ row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
Brian Lopez Owner

would you mind indenting the stuff inside the do block?

Argh, sorry, vim has been acting up for me today on the indentation front. Will fix this and below.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
ext/mysql2/result.c
((58 lines not shown))
VALUE row;
if (cacheRows && i < rowsProcessed) {
row = rb_ary_entry(wrapper->rows, i);
} else {
row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
if (cacheRows) {
- rb_ary_store(wrapper->rows, i, row);
+ rb_ary_store(wrapper->rows, i, row);
Brian Lopez Owner

also indent here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Brian Lopez
Owner

Initial review looks good!

We should figure out a way to test this. Also, if each is interrupted before iterating over all the results and the user tries to iterate again - what should happens?

Ben McRedmond

Yeah that's an awkward one. Currently, it'll pick up where it left off — which isn't ideal but there aren't really any other options. We can't throw an exception because there needs to be a way for the user to finish iterating through the results (if they want to use this connection again). Thoughts?

README.md
@@ -212,6 +212,21 @@ This is especially helpful since it saves the cost of creating the row in Ruby i
If you only plan on using each row once, then it's much more efficient to disable this behavior by setting the `:cache_rows` option to false.
This would be helpful if you wanted to iterate over the results in a streaming manner. Meaning the GC would cleanup rows you don't need anymore as you're iterating over the result set.
+### Streaming
+
+`Mysql2::Client` can optionally only fetch rows from the server on demand by setting `:streaming => true`. This is handy when handling very large result sets which might not fit in memory on the client.
+
+``` ruby
+result = client.query("SELECT * FROM really_big_Table", :streaming => true)
Brian Lopez Owner

Not a huge deal by any means, but I kinda feel like :stream => true seems like a better name for this? Waddya think?

Yup, I prefer that too.

Yeah. :stream => true looks better for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Ben McRedmond added some commits
Jeshua Borges

I cannot wait to use this.

Brian Lopez
Owner

Getting a spec failure from "should cache previously yielded results by default" was it passing for you?

Dmitriy Kiriyenko

=)
Never use git add .

Ricardo Amorim

Awesome! Goodbye mysql gem! :D

Ben McRedmond

"should cache previously yielded results by default" is failing for me too, Brian. Weird bug, going to look into it now.

Ben McRedmond

Fixed spec issue, they all run for me now. Any ideas on how to test this, Brian? (Or anyone?)

Brian Lopez
Owner

Thanks for fixing that up.

For testing, maybe we can start by doing something similar to the cache rows tests (checking object ids and such). I'll try and think about how we can more specifically test the fact that we're "streaming".

benmcredmond added some commits
Ben McRedmond benmcredmond Added some tests 07261d9
Ben McRedmond benmcredmond Moved/tidied tests. Bug fix in client.c
The bug fix in client.c is less than ideal but there isn't really
another option without some rearchitecting. Basically, wrapper->active
should not be set to 0 when mysql_use_result is being used until we
have iterated through the full result set (as the wrpaper is infact
still active). Unfortunately, we would need to change this within
Mysql2::Result, which isn't possible at the moment unless we pass the
client to the result object (which seems overkill). So, for now, the
query will fail with a "commands out of sync" error.

# Please enter the commit message for your changes. Lines starting
# with '#' will be ignored, and an empty message aborts the commit.
# On branch master
# Your branch is ahead of 'origin/master' by 1 commit.
#
# Changes to be committed:
#   (use "git reset HEAD <file>..." to unstage)
#
#	modified:   ext/mysql2/client.c
#	modified:   spec/mysql2/client_spec.rb
#	modified:   spec/mysql2/result_spec.rb
#
# Untracked files:
#   (use "git add <file>..." to include in what will be committed)
#
#	Gemfile.lock
d40bc00
Ben McRedmond

I've added some tests for this.

"should yield different value for #first if streaming" and "should yield the same value for #first if not streaming": kind of test that we are streaming, I think they are good for now.

The rest of the tests just make sure that streaming is working properly (and helped me find a bug!).

Julio Capote

+1, this would be awesome to use

Seamus Abshere

This is great to see! @brianmario, what are your plans for it?

Brian Lopez
Owner

I just need to test it some more, definitely want to merge this soon

Ben McRedmond

Brian, any update on this?

Brian Lopez brianmario merged commit e512ab9 into from
Brian Lopez
Owner

Sorry, been crazy busy. I'll do my best to give it another round of testing then push out another release as soon as I can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 10, 2011
  1. Initial implementation of mysql_use_result, accessible by setting :st…

    Ben McRedmond authored
    …reaming => true when querying
  2. Updated README to describe streaming

    Ben McRedmond authored
  3. fixing vim's messed up indentation

    Ben McRedmond authored
  4. Accidentally committed Gemfile.lock

    Ben McRedmond authored
  5. Think I actually fixed indentation this time

    Ben McRedmond authored
  6. Change :streaming to :stream

    Ben McRedmond authored
  7. More stupid vim indentation

    Ben McRedmond authored
  8. grrrrrrrr

    Ben McRedmond authored
Commits on Dec 3, 2011
  1. Ben McRedmond

    Fix issue with "should cache previously yielded results by default"

    benmcredmond authored
    Was accidentally overwriting wrapper->numberOfRows on iterations after
    first.
  2. Ben McRedmond

    Fixed some icky ifs

    benmcredmond authored
  3. Ben McRedmond

    Consistency++

    benmcredmond authored
Commits on Dec 10, 2011
  1. Ben McRedmond

    Added some tests

    benmcredmond authored
  2. Ben McRedmond

    Moved/tidied tests. Bug fix in client.c

    benmcredmond authored
    The bug fix in client.c is less than ideal but there isn't really
    another option without some rearchitecting. Basically, wrapper->active
    should not be set to 0 when mysql_use_result is being used until we
    have iterated through the full result set (as the wrpaper is infact
    still active). Unfortunately, we would need to change this within
    Mysql2::Result, which isn't possible at the moment unless we pass the
    client to the result object (which seems overkill). So, for now, the
    query will fail with a "commands out of sync" error.
    
    # Please enter the commit message for your changes. Lines starting
    # with '#' will be ignored, and an empty message aborts the commit.
    # On branch master
    # Your branch is ahead of 'origin/master' by 1 commit.
    #
    # Changes to be committed:
    #   (use "git reset HEAD <file>..." to unstage)
    #
    #	modified:   ext/mysql2/client.c
    #	modified:   spec/mysql2/client_spec.rb
    #	modified:   spec/mysql2/result_spec.rb
    #
    # Untracked files:
    #   (use "git add <file>..." to include in what will be committed)
    #
    #	Gemfile.lock
This page is out of date. Refresh to see the latest.
15 README.md
View
@@ -212,6 +212,21 @@ This is especially helpful since it saves the cost of creating the row in Ruby i
If you only plan on using each row once, then it's much more efficient to disable this behavior by setting the `:cache_rows` option to false.
This would be helpful if you wanted to iterate over the results in a streaming manner. Meaning the GC would cleanup rows you don't need anymore as you're iterating over the result set.
+### Streaming
+
+`Mysql2::Client` can optionally only fetch rows from the server on demand by setting `:stream => true`. This is handy when handling very large result sets which might not fit in memory on the client.
+
+``` ruby
+result = client.query("SELECT * FROM really_big_Table", :stream => true)
+```
+
+There are a few things that need to be kept in mind while using streaming:
+
+* `:cache_rows` is ignored currently. (if you want to use `:cache_rows` you probably don't want to be using `:stream`)
+* You must fetch all rows in the result set of your query before you can make new queries. (i.e. with `Mysql2::Result#each`)
+
+Read more about the consequences of using `mysql_use_result` (what streaming is implemented with) here: http://dev.mysql.com/doc/refman/5.0/en/mysql-use-result.html.
+
## ActiveRecord
To use the ActiveRecord driver (with or without rails), all you should need to do is have this gem installed and set the adapter in your database.yml to "mysql2".
29 ext/mysql2/client.c
View
@@ -9,7 +9,7 @@
VALUE cMysql2Client;
extern VALUE mMysql2, cMysql2Error;
static VALUE intern_encoding_from_charset;
-static VALUE sym_id, sym_version, sym_async, sym_symbolize_keys, sym_as, sym_array;
+static VALUE sym_id, sym_version, sym_async, sym_symbolize_keys, sym_as, sym_array, sym_stream;
static ID intern_merge, intern_error_number_eql, intern_sql_state_eql;
#define REQUIRE_OPEN_DB(wrapper) \
@@ -274,21 +274,32 @@ static VALUE nogvl_read_query_result(void *ptr) {
return res == 0 ? Qtrue : Qfalse;
}
-/* mysql_store_result may (unlikely) read rows off the socket */
-static VALUE nogvl_store_result(void *ptr) {
+static VALUE nogvl_do_result(void *ptr, char use_result) {
mysql_client_wrapper *wrapper;
MYSQL_RES *result;
wrapper = (mysql_client_wrapper *)ptr;
- result = mysql_store_result(wrapper->client);
+ if(use_result) {
+ result = mysql_use_result(wrapper->client);
+ } else {
+ result = mysql_store_result(wrapper->client);
+ }
// once our result is stored off, this connection is
// ready for another command to be issued
wrapper->active = 0;
-
return (VALUE)result;
}
+/* mysql_store_result may (unlikely) read rows off the socket */
+static VALUE nogvl_store_result(void *ptr) {
+ return nogvl_do_result(ptr, 0);
+}
+
+static VALUE nogvl_use_result(void *ptr) {
+ return nogvl_do_result(ptr, 1);
+}
+
static VALUE rb_mysql_client_async_result(VALUE self) {
MYSQL_RES * result;
VALUE resultObj;
@@ -308,7 +319,12 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
return rb_raise_mysql2_error(wrapper);
}
- result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
+ VALUE is_streaming = rb_hash_aref(rb_iv_get(self, "@query_options"), sym_stream);
+ if(is_streaming == Qtrue) {
+ result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_use_result, wrapper, RUBY_UBF_IO, 0);
+ } else {
+ result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
+ }
if (result == NULL) {
if (mysql_field_count(wrapper->client) != 0) {
@@ -780,6 +796,7 @@ void init_mysql2_client() {
sym_symbolize_keys = ID2SYM(rb_intern("symbolize_keys"));
sym_as = ID2SYM(rb_intern("as"));
sym_array = ID2SYM(rb_intern("array"));
+ sym_stream = ID2SYM(rb_intern("stream"));
intern_merge = rb_intern("merge");
intern_error_number_eql = rb_intern("error_number=");
3  ext/mysql2/client.h
View
@@ -39,4 +39,5 @@ typedef struct {
MYSQL *client;
} mysql_client_wrapper;
-#endif
+#endif
+
103 ext/mysql2/result.c
View
@@ -55,7 +55,7 @@ static VALUE intern_encoding_from_charset;
static ID intern_new, intern_utc, intern_local, intern_encoding_from_charset_code,
intern_localtime, intern_local_offset, intern_civil, intern_new_offset;
static VALUE sym_symbolize_keys, sym_as, sym_array, sym_database_timezone, sym_application_timezone,
- sym_local, sym_utc, sym_cast_booleans, sym_cache_rows, sym_cast;
+ sym_local, sym_utc, sym_cast_booleans, sym_cache_rows, sym_cast, sym_stream;
static ID intern_merge;
static void rb_mysql_result_mark(void * wrapper) {
@@ -392,7 +392,7 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
ID db_timezone, app_timezone, dbTz, appTz;
mysql2_result_wrapper * wrapper;
unsigned long i;
- int symbolizeKeys = 0, asArray = 0, castBool = 0, cacheRows = 1, cast = 1;
+ int symbolizeKeys = 0, asArray = 0, castBool = 0, cacheRows = 1, cast = 1, streaming = 0;
GetMysql2Result(self, wrapper);
@@ -423,6 +423,14 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
cast = 0;
}
+ if(rb_hash_aref(opts, sym_stream) == Qtrue) {
+ streaming = 1;
+ }
+
+ if(streaming && cacheRows) {
+ rb_warn("cacheRows is ignored if streaming is true");
+ }
+
dbTz = rb_hash_aref(opts, sym_database_timezone);
if (dbTz == sym_local) {
db_timezone = intern_local;
@@ -445,48 +453,77 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
}
if (wrapper->lastRowProcessed == 0) {
- wrapper->numberOfRows = mysql_num_rows(wrapper->result);
- if (wrapper->numberOfRows == 0) {
+ if(streaming) {
+ // We can't get number of rows if we're streaming,
+ // until we've finished fetching all rows
+ wrapper->numberOfRows = 0;
wrapper->rows = rb_ary_new();
- return wrapper->rows;
+ } else {
+ wrapper->numberOfRows = mysql_num_rows(wrapper->result);
+ if (wrapper->numberOfRows == 0) {
+ wrapper->rows = rb_ary_new();
+ return wrapper->rows;
+ }
+ wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
}
- wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
}
- if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
- // we've already read the entire dataset from the C result into our
- // internal array. Lets hand that over to the user since it's ready to go
- for (i = 0; i < wrapper->numberOfRows; i++) {
- rb_yield(rb_ary_entry(wrapper->rows, i));
- }
- } else {
- unsigned long rowsProcessed = 0;
- rowsProcessed = RARRAY_LEN(wrapper->rows);
- for (i = 0; i < wrapper->numberOfRows; i++) {
+ if (streaming) {
+ if(!wrapper->streamingComplete) {
VALUE row;
- if (cacheRows && i < rowsProcessed) {
- row = rb_ary_entry(wrapper->rows, i);
- } else {
+
+ do {
row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
- if (cacheRows) {
- rb_ary_store(wrapper->rows, i, row);
+
+ if (block != Qnil) {
+ rb_yield(row);
+ wrapper->lastRowProcessed++;
}
- wrapper->lastRowProcessed++;
+ } while(row != Qnil);
+
+ rb_mysql_result_free_result(wrapper);
+
+ wrapper->numberOfRows = wrapper->lastRowProcessed;
+ wrapper->streamingComplete = 1;
+ } else {
+ rb_raise(cMysql2Error, "You have already fetched all the rows for this query and streaming is true. (to reiterate you must requery).");
+ }
+ } else {
+ if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
+ // we've already read the entire dataset from the C result into our
+ // internal array. Lets hand that over to the user since it's ready to go
+ for (i = 0; i < wrapper->numberOfRows; i++) {
+ rb_yield(rb_ary_entry(wrapper->rows, i));
}
+ } else {
+ unsigned long rowsProcessed = 0;
+ rowsProcessed = RARRAY_LEN(wrapper->rows);
+ for (i = 0; i < wrapper->numberOfRows; i++) {
+ VALUE row;
+ if (cacheRows && i < rowsProcessed) {
+ row = rb_ary_entry(wrapper->rows, i);
+ } else {
+ row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
+ if (cacheRows) {
+ rb_ary_store(wrapper->rows, i, row);
+ }
+ wrapper->lastRowProcessed++;
+ }
+
+ if (row == Qnil) {
+ // we don't need the mysql C dataset around anymore, peace it
+ rb_mysql_result_free_result(wrapper);
+ return Qnil;
+ }
- if (row == Qnil) {
+ if (block != Qnil) {
+ rb_yield(row);
+ }
+ }
+ if (wrapper->lastRowProcessed == wrapper->numberOfRows) {
// we don't need the mysql C dataset around anymore, peace it
rb_mysql_result_free_result(wrapper);
- return Qnil;
}
-
- if (block != Qnil) {
- rb_yield(row);
- }
- }
- if (wrapper->lastRowProcessed == wrapper->numberOfRows) {
- // we don't need the mysql C dataset around anymore, peace it
- rb_mysql_result_free_result(wrapper);
}
}
@@ -514,6 +551,7 @@ VALUE rb_mysql_result_to_obj(MYSQL_RES * r) {
wrapper->fields = Qnil;
wrapper->rows = Qnil;
wrapper->encoding = Qnil;
+ wrapper->streamingComplete = 0;
rb_obj_call_init(obj, 0, NULL);
return obj;
}
@@ -551,6 +589,7 @@ void init_mysql2_result() {
sym_application_timezone = ID2SYM(rb_intern("application_timezone"));
sym_cache_rows = ID2SYM(rb_intern("cache_rows"));
sym_cast = ID2SYM(rb_intern("cast"));
+ sym_stream = ID2SYM(rb_intern("stream"));
opt_decimal_zero = rb_str_new2("0.0");
rb_global_variable(&opt_decimal_zero); //never GC
1  ext/mysql2/result.h
View
@@ -11,6 +11,7 @@ typedef struct {
unsigned int numberOfFields;
unsigned long numberOfRows;
unsigned long lastRowProcessed;
+ char streamingComplete;
char resultFreed;
MYSQL_RES *result;
} mysql2_result_wrapper;
16 spec/mysql2/client_spec.rb
View
@@ -92,6 +92,22 @@ def connect *args
end
context "#query" do
+ it "should let you query again if iterating is finished when streaming" do
+ @client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false).each {}
+
+ expect {
+ @client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false)
+ }.to_not raise_exception(Mysql2::Error)
+ end
+
+ it "should not let you query again if iterating is not finished when streaming" do
+ @client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false).first
+
+ expect {
+ @client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false)
+ }.to raise_exception(Mysql2::Error)
+ end
+
it "should only accept strings as the query parameter" do
lambda {
@client.query ["SELECT 'not right'"]
19 spec/mysql2/result_spec.rb
View
@@ -73,6 +73,25 @@
result = @client.query "SELECT 1", :cache_rows => false
result.first.object_id.should_not eql(result.first.object_id)
end
+
+ it "should yield different value for #first if streaming" do
+ result = @client.query "SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false
+ result.first.should_not eql(result.first)
+ end
+
+ it "should yield the same value for #first if streaming is disabled" do
+ result = @client.query "SELECT 1 UNION SELECT 2", :stream => false
+ result.first.should eql(result.first)
+ end
+
+ it "should throw an exception if we try to iterate twice when streaming is enabled" do
+ result = @client.query "SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false
+
+ expect {
+ result.each {}
+ result.each {}
+ }.to raise_exception(Mysql2::Error)
+ end
end
context "#fields" do
Something went wrong with that request. Please try again.