Skip to content

Commit

Permalink
scan_scroll method.
Browse files Browse the repository at this point in the history
  • Loading branch information
gugod committed May 28, 2014
1 parent e2a7dcc commit e855824
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
25 changes: 25 additions & 0 deletions lib/Elastijk.pm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ sub request {

sub request_raw {
my $args = _build_hijk_request_args($_[0]);
print Data::Dumper::Dumper($args) if $args->{method} eq "GET";
my $res = Hijk::request($args);
return $res->{status}, $res->{body};
}
Expand Down Expand Up @@ -261,6 +262,30 @@ to check the existence of different things:
See also L<http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-exists.html> ,
L<http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-types-exists.html#indices-types-exists> , and L<http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/doc-exists.html>
=head2 scan_scroll( ..., on_response => sub {} )
A way to perform L<scan and scroll|http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html>.
The boilerplate to use it is something like:
$es->scan_scroll(
index => "tweet",
body => { query => { match_all => {} }, size => 1000 },
on_response => sub {
my ($status,$res) = @_;
for my $hit (@{ $res->{hits}{hits} }) {
...
}
}
);
The "search_type" is forced to be "scan" in this method.
The very last value to the C<on_response> key is a callback subroutine that is
called after each HTTP request. The arguments are HTTP status code and response
body hash just like other methods.
=head1 AUTHORS
=over 4
Expand Down
25 changes: 25 additions & 0 deletions lib/Elastijk/oo.pm
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,29 @@ sub bulk {
return ($status, $res);
}

sub scan_scroll {
my ($self, %args) = @_;
my $on_response_callback = delete $args{on_response};

my %uri_param = %{ delete($args{uri_param}) || {} };
$uri_param{search_type} = "scan";
$uri_param{scroll} //= "10m";
my ($status, $res) = $self->get(%args, command => "_search", uri_param => \%uri_param);
my $scroll_id;

while (substr($status,0,1) eq '2') {
$scroll_id = $res->{_scroll_id};
($status,$res) = $self->get(
index => "_search", type => "scroll", #WTF
uri_param => { scroll => $uri_param{scroll}, scroll_id => $scroll_id }
);
last unless substr($status,0,1) eq '2';
print ">> $status, $res->{_scroll_id}\n";
my $r = $on_response_callback->($status, $res);
if (defined($r) && !$r) {
last;
}
}
}

1;
49 changes: 49 additions & 0 deletions t/live-oo-scan-scroll.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env perl
use strict;
use warnings;
use Test::More;

unless ($ENV{TEST_LIVE}) {
plan skip_all => "Set env TEST_LIVE=1 to run this test."
}

use Elastijk;

my $test_index_name = "test_index_$$".rand();
my $es = Elastijk->new(host => 'localhost', port => '9200', index => $test_index_name );
## create the index, and index some documents.
$es->put(
body => {
settings => { index => {number_of_replicas => 0, number_of_shards => 1} },
mappings => { somedata => { properties => { somestr => { type => "string" }, someint => { type => "long" }}}}
}
);

## create 5000 documents
$es->post(
type => 'somestr',
body => {
# U+1F30x ~ U+1F56x
somestr => join("", map { chr(rand()*260+0x1F300) } (0..(10+rand()*128))),
someint => int(rand()*2**16),
}
) for (0..4999);

sleep 2; # wait for refresh.
is $es->count(), 5000, "count 5000 documents";

## finally, testing scan_scroll
my $count = 0;
$es->scan_scroll(
body => { size => 1000, query => { match_all => {} } },
on_response => sub {
$count++;
return 1;
}
);
is $count, 5, "on_response is called exactly 5 times.";

## delete the index
$es->delete( index => $test_index_name );

done_testing;

0 comments on commit e855824

Please sign in to comment.