diff --git a/lib/Elastijk.pm b/lib/Elastijk.pm index 5359342..1d8ef6b 100644 --- a/lib/Elastijk.pm +++ b/lib/Elastijk.pm @@ -39,6 +39,7 @@ sub request { sub request_raw { my $args = _build_hijk_request_args($_[0]); + print Data::Dumper::Dumper($args) if $args->{method} eq "GET"; my $res = Hijk::request($args); return $res->{status}, $res->{body}; } @@ -261,6 +262,30 @@ to check the existence of different things: See also L , L , and L +=head2 scan_scroll( ..., on_response => sub {} ) + +A way to perform L. + +The boilerplate to use it is something like: + + $es->scan_scroll( + index => "tweet", + body => { query => { match_all => {} }, size => 1000 }, + on_response => sub { + my ($status,$res) = @_; + for my $hit (@{ $res->{hits}{hits} }) { + ... + } + } + ); + +The "search_type" is forced to be "scan" in this method. + +The very last value to the C key is a callback subroutine that is +called after each HTTP request. The arguments are HTTP status code and response +body hash just like other methods. + + =head1 AUTHORS =over 4 diff --git a/lib/Elastijk/oo.pm b/lib/Elastijk/oo.pm index fec41e7..e0a1781 100644 --- a/lib/Elastijk/oo.pm +++ b/lib/Elastijk/oo.pm @@ -81,4 +81,29 @@ sub bulk { return ($status, $res); } +sub scan_scroll { + my ($self, %args) = @_; + my $on_response_callback = delete $args{on_response}; + + my %uri_param = %{ delete($args{uri_param}) || {} }; + $uri_param{search_type} = "scan"; + $uri_param{scroll} //= "10m"; + my ($status, $res) = $self->get(%args, command => "_search", uri_param => \%uri_param); + my $scroll_id; + + while (substr($status,0,1) eq '2') { + $scroll_id = $res->{_scroll_id}; + ($status,$res) = $self->get( + index => "_search", type => "scroll", #WTF + uri_param => { scroll => $uri_param{scroll}, scroll_id => $scroll_id } + ); + last unless substr($status,0,1) eq '2'; + print ">> $status, $res->{_scroll_id}\n"; + my $r = $on_response_callback->($status, $res); + if (defined($r) && !$r) { + last; + } + } +} + 1; diff --git a/t/live-oo-scan-scroll.t b/t/live-oo-scan-scroll.t new file mode 100644 index 0000000..4d64372 --- /dev/null +++ b/t/live-oo-scan-scroll.t @@ -0,0 +1,49 @@ +#!/usr/bin/env perl +use strict; +use warnings; +use Test::More; + +unless ($ENV{TEST_LIVE}) { + plan skip_all => "Set env TEST_LIVE=1 to run this test." +} + +use Elastijk; + +my $test_index_name = "test_index_$$".rand(); +my $es = Elastijk->new(host => 'localhost', port => '9200', index => $test_index_name ); +## create the index, and index some documents. +$es->put( + body => { + settings => { index => {number_of_replicas => 0, number_of_shards => 1} }, + mappings => { somedata => { properties => { somestr => { type => "string" }, someint => { type => "long" }}}} + } +); + +## create 5000 documents +$es->post( + type => 'somestr', + body => { + # U+1F30x ~ U+1F56x + somestr => join("", map { chr(rand()*260+0x1F300) } (0..(10+rand()*128))), + someint => int(rand()*2**16), + } +) for (0..4999); + +sleep 2; # wait for refresh. +is $es->count(), 5000, "count 5000 documents"; + +## finally, testing scan_scroll +my $count = 0; +$es->scan_scroll( + body => { size => 1000, query => { match_all => {} } }, + on_response => sub { + $count++; + return 1; + } +); +is $count, 5, "on_response is called exactly 5 times."; + +## delete the index +$es->delete( index => $test_index_name ); + +done_testing;