Skip to content
Browse files

Added Elastic::Model::Bulk

Closes #16
  • Loading branch information...
1 parent e1e03f6 commit 23d7a1687869c46e9a5126a0bd998a8b1c0f1c27 @clintongormley committed
View
4 lib/Elastic/Model.pm
@@ -324,6 +324,10 @@ C<scrolled_results> C<----> L<Elastic::Model::Results::Scrolled>
C<result> C<--------------> L<Elastic::Model::Result>
+=item *
+
+C<bulk> C<----------------> L<Elastic::Model::Bulk>
+
=back
=head1 SEE ALSO
View
316 lib/Elastic/Model/Bulk.pm
@@ -0,0 +1,316 @@
+package Elastic::Model::Bulk;
+
+use Moose;
+use namespace::autoclean;
+use Data::Dumper;
+
+use Carp;
+
+#===================================
+has 'on_conflict' => (
+#===================================
+ is => 'rw',
+ isa => 'CodeRef',
+);
+
+#===================================
+has 'on_error' => (
+#===================================
+ is => 'rw',
+ isa => 'CodeRef',
+);
+
+#===================================
+has 'size' => (
+#===================================
+ is => 'ro',
+ isa => 'Int',
+ default => sub {1000}
+);
+
+#===================================
+has '_actions' => (
+#===================================
+ is => 'ro',
+ isa => 'ArrayRef',
+ traits => ['Array'],
+ writer => '_set_actions',
+ default => sub { [] },
+ handles => {
+ _push_action => 'push',
+ count => 'count',
+ }
+);
+
+#===================================
+has '_docs' => (
+#===================================
+ is => 'ro',
+ isa => 'ArrayRef',
+ traits => ['Array'],
+ writer => '_set_docs',
+ default => sub { [] },
+ handles => { _push_doc => 'push', }
+);
+
+no Moose;
+
+#===================================
+sub save {
+#===================================
+ my $self = shift;
+ my $doc = shift;
+
+ return unless $doc->has_changed || !$doc->uid->from_store;
+
+ my $meta = Class::MOP::class_of($doc);
+ die "Cannot bulk index class ("
+ . $doc->original_class
+ . ") because it contains unique keys"
+ if $meta->unique_keys;
+
+ $doc->touch;
+
+ my %args = @_;
+ my $uid = $doc->uid;
+
+ croak "Cannot save partial doc type ("
+ . $uid->type
+ . ") id ("
+ . $uid->id . ")"
+ if $uid->is_partial;
+
+ my $data = $self->model->deflate_object($doc);
+ my $version = delete $args{version};
+
+ my $action
+ = ( $uid->from_store or $uid->id and defined $version )
+ ? 'index'
+ : 'create';
+ %args = ( %args, %{ $uid->write_params } );
+ $args{version} = $version
+ if defined $version;
+
+ $self->_push_action( { $action => { %args, data => $data } } );
+ $self->_push_doc($doc);
+ $self->commit if $self->count >= $self->size;
+ return;
+}
+
+#===================================
+sub overwrite {
+#===================================
+ my $self = shift;
+ $self->save( @_, version => 0 );
+}
+
+#===================================
+sub commit {
+#===================================
+ my $self = shift;
+ return unless $self->count;
+
+ my $actions = $self->_actions;
+ my $docs = $self->_docs;
+ my $on_conflict = $self->on_conflict;
+ my $on_error = $self->on_error;
+
+ $self->clear;
+
+ my %args = ( actions => $actions );
+ $args{on_conflict} = sub { $self->_on_conflict( $docs, @_ ) }
+ if $on_conflict;
+ $args{on_error} = sub { $self->_on_error( $docs, @_ ) }
+ if $on_error;
+
+ my $response = $self->model->store->bulk(%args);
+ my $results = $response->{results};
+ my $model = $self->model;
+ my $scope = $model->current_scope;
+
+ for my $i ( 0 .. @$docs - 1 ) {
+ my ( undef, $result ) = %{ $results->[$i] };
+ next if $result->{error};
+ my $doc = $docs->[$i];
+ my $uid = $doc->uid;
+ $uid->update_from_store($result);
+ $doc->_set_source( $result->{data} );
+ if ($scope) {
+ my $ns = $model->namespace_for_domain( $result->{_index} );
+ $scope->store_object( $ns->name, $doc );
+ }
+ }
+
+ if ( my $unhandled = $response->{errors} ) {
+ local $Data::Dumper::Terse = 1;
+ local $Data::Dumper::Indent = 1;
+
+ my @errors = splice @$unhandled, 0, 2;
+ die "Uncaught errors while commiting Bulk:"
+ . Dumper( \@errors )
+ . ( @$unhandled ? "\nand " . ( 0 + @$unhandled ) . " more" : '' );
+ }
+ return 1;
+
+}
+
+#===================================
+sub _on_conflict {
+#===================================
+ my ( $self, $docs, $action, $data, $raw, $i ) = @_;
+ my $original = $docs->[$i];
+
+ my $uid;
+ if ( $raw =~ /: version conflict, current \[(\d+)\]/ ) {
+ $uid = Elastic::Model::UID->new(
+ %{ $original->uid->read_params },
+ version => $1,
+ from_store => 1
+ );
+ }
+ else {
+ $uid = $original->uid->clone;
+ }
+
+ my $new = $self->model->get_doc( uid => $uid );
+ $self->on_conflict->( $original, $new );
+ return;
+
+}
+
+#===================================
+sub _on_error {
+#===================================
+ my ( $self, $docs, $action, $data, $error, $i ) = @_;
+ my $original = $docs->[$i];
+ $self->on_error->( $original, $error );
+ return;
+
+}
+
+#===================================
+sub clear {
+#===================================
+ my $self = shift;
+ $self->_set_actions( [] );
+ $self->_set_docs( [] );
+}
+
+#===================================
+sub DEMOLISH { shift->commit }
+#===================================
+
+1;
+
+__END__
+
+# ABSTRACT: Bulk-saving of multiple docs for increased throughput
+
+=head1 SYNOPSIS
+
+ $bulk = $model->bulk(
+ size => 1000,
+ on_conflict => sub {...},
+ on_error => sub {...}
+ );
+
+ $bulk->save($doc);
+ $bulk->overwrite($doc);
+ ...
+
+ $bulk->commit;
+
+=head1 DESCRIPTION
+
+If you need to create or update multiple docs at once, then bulk indexing is
+the way to go. It batches up the documents and saves C<size> (default 1000)
+documents in a single request, which is much faster than writing each
+doc individually.
+
+Once you are finished adding docs to the C<$bulk> indexer, call L</commit()>
+to save any docs that haven't been saved yet. If C<$bulk> goes out of scope,
+then L</commit()> will be called for you, but it is safer to call it yourself.
+
+B<Note:> Bulk indexing is not supported for classes which have
+L<unique key constraints|Elastic::Manual::Attributes/unique_key>.
+
+=head1 ATTRIBUTES
+
+=head2 size
+
+The number of docs that will be saved in a single request. Defaults to 1000.
+
+=head2 on_conflict
+
+A callback which will be called if there is any conflict when saving a doc, for
+instance, trying to create a doc that already exists, or trying to save a doc
+when a newer version already exists in Elasticsearch.
+
+The callback is called with two arguments:
+
+=over
+
+=item *
+
+The doc you are trying to save
+
+=item *
+
+The current version of the doc which exists in Elasticsearch
+
+=back
+
+See L<Elastic::Model::Role::Doc/save()> for more.
+
+=head2 on_error
+
+The C<on_error> callback will be called for any non-conflict error (or
+for conflict errors if no L</on_conflict> handler has been specified).
+It is called with two arguments:
+
+=over
+
+=item *
+
+The doc you are trying to save
+
+=item *
+
+The error string returned by Elasticsearch
+
+=back
+
+If no C<on_error> handler is specified, then bulk indexing will die with
+an error message.
+
+=head1 METHODS
+
+=head2 save()
+
+ $bulk->save($doc);
+
+Adds a doc to the internal queue to be saved later.
+
+=head2 overwrite()
+
+ $bulk->overwrite($doc);
+
+Adds a doc to the interal queue to be overwritten later. In other words,
+no version checking is done - if a newer version of the doc exists in
+Elasticsearch, it will be overwritten.
+
+=head2 commit()
+
+ $bulk->commit()
+
+Writes all docs in the queue to Elasticsearch. This is called automatically
+when there are L</size> docs in the queue, or when the C<$bulk> instance
+goes out of scope, although you should call L</commit()> yourself once
+you are finished adding docs, just to be on the safe side.
+
+=head2 clear()
+
+ $bulk->clear()
+
+Clears any docs that are still in the queue.
+
View
1 lib/Elastic/Model/Meta/Class/Model.pm
@@ -61,6 +61,7 @@ has 'classes' => (
results => 'Elastic::Model::Results',
scrolled_results => 'Elastic::Model::Results::Scrolled',
result => 'Elastic::Model::Result',
+ bulk => 'Elastic::Model::Bulk'
};
},
handles => {
View
2 lib/Elastic/Model/Role/Doc.pm
@@ -343,6 +343,8 @@ Saves the C<$doc> to ElasticSearch. If this is a new doc, and a doc with the
same type and ID already exists in the same index, then ElasticSearch
will throw an exception.
+Also see L<Elastic::Model::Bulk> for bulk indexing of multiple docs.
+
If the doc was previously loaded from ElasticSearch, then that doc will be
updated. However, because ElasticSearch uses
L<optimistic locking|http://en.wikipedia.org/wiki/Optimistic_locking>
View
17 lib/Elastic/Model/Role/Model.pm
@@ -16,7 +16,7 @@ use List::MoreUtils qw(uniq);
use namespace::autoclean;
my @wrapped_classes = qw(
domain namespace store view scope
- results scrolled_results result
+ results scrolled_results result bulk
);
for my $class (@wrapped_classes) {
@@ -548,6 +548,10 @@ sub _delete_unique_keys {
}
#===================================
+sub bulk { shift->bulk_class->new(@_) }
+#===================================
+
+#===================================
sub search { shift->store->search(@_) }
#===================================
@@ -818,6 +822,17 @@ Attempting to save a partial doc will cause an error to be thrown.
You shouldn't need to call this method yourself.
+=head3 bulk()
+
+Returns a new instance of L<Elastic::Model::Bulk> for fast indexing
+of multiple docs in batches.
+
+ $bulk = $model->bulk(
+ size => 1000,
+ on_conflict => sub {....},
+ on_error => sub {....}
+ );
+
=head2 Miscellaneous
=head3 namespaces
View
19 lib/Elastic/Model/Role/Store.pm
@@ -60,6 +60,13 @@ sub delete_doc {
return $self->es->delete( %{ $uid->write_params }, %args );
}
+#===================================
+sub bulk {
+#===================================
+ my ( $self, %args ) = @_;
+ return $self->es->bulk(%args);
+}
+
1;
__END__
@@ -126,6 +133,18 @@ result. Any failure throws an exception. If the L<version|Elastic::Model::UID/"
number does not match what is stored in ElasticSearch, then a conflict exception
will be thrown. Any C<%args> will be passed to L<ElasticSearch/"delete()">.
+=head2 bulk()
+
+ $result = $store->bulk(
+ actions => $actions,
+ on_conflict => sub {...},
+ on_error => sub {...},
+ %args
+ );
+
+Performs several actions in a single request. Any %agrs will be passed to
+L<ElasticSearch/bulk()>.
+
=head2 search()
$results = $store->search(@args);
View
66 t/65_bulk/01_bulk.t
@@ -0,0 +1,66 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+use Test::More 0.96;
+use Test::Exception;
+use Test::Deep;
+
+use lib 't/lib';
+
+our $es;
+do 'es.pl';
+
+use_ok 'MyApp' || print 'Bail out';
+
+my $model = new_ok( 'MyApp', [ es => $es ], 'Model' );
+isa_ok my $index = $model->namespace('myapp')->index, 'Elastic::Model::Index';
+ok $index->create, 'Create index myapp';
+isa_ok my $domain = $model->domain('myapp'), 'Elastic::Model::Domain';
+isa_ok my $bulk = $model->bulk( size => 10 ), 'Elastic::Model::Bulk';
+is $bulk->size, 10, 'Bulk size set correctly';
+
+is 0 + users(), 196, 'Have 196 users';
+
+$bulk->save($_) for users();
+
+## COMMIT
+
+ok $index->refresh, 'Refresh index';
+is $domain->view->search->total, 190, '190 users auto-indexed';
+
+ok $bulk->commit, 'Commit bulk';
+ok $index->refresh, 'Refresh index';
+is $domain->view->search->total, 196, '196 users auto-indexed';
+
+## CLEAR
+
+ok $index->delete, 'Delete index';
+ok $index->create, 'Create index myapp';
+$bulk->save($_) for users();
+
+ok $bulk->clear, 'Clear bulk';
+ok $index->refresh, 'Refresh index';
+is $domain->view->search->total, 190, '190 users auto-indexed';
+
+## SCOPE
+
+ok $index->delete, 'Delete index';
+ok $index->create, 'Create index myapp';
+$bulk->save($_) for users();
+
+ok !undef($bulk), 'Bulk out of scope';
+ok $index->refresh, 'Refresh index';
+is $domain->view->search->total, 196, '196 users auto-indexed';
+
+## DONE ##
+
+done_testing;
+
+#===================================
+sub users {
+#===================================
+ my $i = 1;
+ map { $domain->new_doc( user => { id => $i++, name => $_ } ) } names();
+}
+__END__
View
93 t/65_bulk/02_error_handlers.t
@@ -0,0 +1,93 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+use Test::More 0.96;
+use Test::Exception;
+use Test::Deep;
+
+use lib 't/lib';
+
+our $es;
+do 'es.pl';
+
+use_ok 'MyApp' || print 'Bail out';
+
+my $model = new_ok( 'MyApp', [ es => $es ], 'Model' );
+isa_ok my $index = $model->namespace('myapp')->index, 'Elastic::Model::Index';
+isa_ok my $domain = $model->domain('myapp'), 'Elastic::Model::Domain';
+isa_ok my $bulk = $model->bulk( size => 10 ), 'Elastic::Model::Bulk';
+
+ok $index->create, 'Create index myapp';
+my @users = map { $domain->new_doc( user => $_ ) } (
+ { id => 1, user => 'one' },
+ { id => 2, user => 'two' },
+ { id => 3, user => 'three' },
+ { id => 4, user => 'four' },
+);
+
+$bulk->save($_) for @users;
+$bulk->commit;
+
+for ( 1 .. 2 ) {
+ $domain->get( user => $_ )->touch->save;
+}
+
+my ( $conflicts, $errors, $error );
+
+test_bulk( on_conflict => \&on_conflict, on_error => \&on_error );
+ok $conflicts== 2 && $errors == 0 && !$error,
+ 'on_conflict: 2 conflicts, 0 errors, no error';
+
+test_bulk( on_error => \&on_error );
+ok $conflicts== 0 && $errors == 2 && !$error,
+ 'on_error: 0 conflicts, 2 errors, no error';
+
+test_bulk();
+ok $conflicts== 0 && $errors == 0 && $error,
+ 'no handler: 0 conflicts, 0 errors, has error';
+
+#===================================
+sub test_bulk {
+#===================================
+ my %args = @_;
+ $conflicts = 0;
+ $errors = 0;
+ my $bulk = $model->bulk( size => 10, %args );
+
+ # version conflict
+ $bulk->save( $users[0] );
+
+ # version no conflict
+ $bulk->overwrite( $users[1] );
+
+ # exists conflict
+ $bulk->save( $domain->new_doc( user => { id => 3, name => 'three' } ) );
+
+ # exists no conflict
+ $bulk->overwrite(
+ $domain->new_doc( user => { id => 4, name => 'four' } ) );
+
+ eval { $bulk->commit };
+ $error = $@;
+}
+
+#===================================
+sub on_conflict {
+#===================================
+ my ( $old, $new ) = @_;
+ $conflicts++;
+}
+
+#===================================
+sub on_error {
+#===================================
+ my ( $old, $error ) = @_;
+ $errors++;
+}
+
+## DONE ##
+
+done_testing;
+
+__END__
View
42 t/65_bulk/03_scope.t
@@ -0,0 +1,42 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+use Test::More 0.96;
+use Test::Exception;
+use Test::Deep;
+use Scalar::Util qw(refaddr);
+
+use lib 't/lib';
+
+our $es;
+do 'es.pl';
+
+use_ok 'MyApp' || print 'Bail out';
+
+my $model = new_ok( 'MyApp', [ es => $es ], 'Model' );
+isa_ok my $index = $model->namespace('myapp')->index, 'Elastic::Model::Index';
+ok $index->create, 'Create index myapp';
+isa_ok my $domain = $model->domain('myapp'), 'Elastic::Model::Domain';
+isa_ok my $bulk = $model->bulk( size => 10 ), 'Elastic::Model::Bulk';
+
+# no scope
+my $u1 = $domain->new_doc( user => { id => 1, name => 'one' } );
+$bulk->save($u1);
+$bulk->commit;
+
+# scope active
+isa_ok my $scope = $model->new_scope, 'Elastic::Model::Scope';
+
+my $u2 = $domain->new_doc( user => { id => 2, name => 'two' } );
+$bulk->save($u2);
+$bulk->commit;
+
+ok refaddr($u1) ne refaddr( $domain->get( user => 1 ) ),
+ 'U1 scope not active';
+
+is refaddr($u2), refaddr( $domain->get( user => 2 ) ), 'U2 scope active';
+
+done_testing;
+
+__END__

0 comments on commit 23d7a16

Please sign in to comment.
Something went wrong with that request. Please try again.