diff --git a/lib/Elastic/Manual/Reindex.pod b/lib/Elastic/Manual/Reindex.pod new file mode 100644 index 0000000..5603df8 --- /dev/null +++ b/lib/Elastic/Manual/Reindex.pod @@ -0,0 +1,139 @@ +package Elastic::Manual::Reindex; + +# ABSTRACT: How to reindex your data from an old index to a new index + +=head1 INTRODUCTION + +While you can add to the L of +an index, you can't change what is already there. Especially during development, +you will need to L your data to a new +index. + +=head1 USE ALIASES INSTEAD OF INDICES + +The easiest way to work is to have the L +be an L which points at the +current version of your index. For instance: + + my $ns = $model->namespace( 'myapp' ); + $ns->index( 'myapp_v1' )->create; + $ns->alias->to( 'myapp_v1' ); + +Now you're ready to start indexing data into C: + + my $domain = $model->domain( 'myapp' ); + $domain->create( user => { name => 'John'} ); + +When you need to change your mapping, you can just reindex to a new index: + + # create 'myapp_v2' if it doesn't exist, and + # copy 'myapp_v1' to 'myapp_v2' + $ns->index( 'myapp_v2' )->reindex( 'myapp' ); + + # update alias 'myapp' to point to 'myapp_v2' + $ns->alias->to( 'myapp_v2' ); + + # delete the old 'myapp_v1' + $ns->index( 'myapp_v1' )->delete; + + +=head1 UPDATING UIDS + +Imagine you have a C<$post> object which has a C attribute. The +L of the user is stored in ElasticSearch, which +includes the index name. + +When you reindex your data from C to C, +L will automatically update +all UIDs in the reindexed data to point to the new index. + +=head1 UPDATING UIDS IN OTHER INDICES + +Now imagine that you have another index (one you're not reindexing) which also +has UIDs which point to the old index. These will no longer be valid. You +need to update the old UIDs to point to the new index. + +You can do this with: + + $ns->index( 'myapp_v2' )->reindex( + domain => 'myapp_v1', + repoint_uids => 1 + ); + +This will automatically find all UIDs in any index known to your +L and update them. + +If you don't want to do this in a single step, you can do it in two: + + $index = $ns->index( 'myapp_v2' ); + $index->reindex( 'myapp_v1' ); + $index->repoint_uids( index_map => { myapp_v1 => 'myapp_v2' }) ; + +=head1 CHANGING DOC STRUCTURE WHILE REINDEXING + +Perhaps, when reindexing, you need to change the structure of the +document. For instance, perhaps you have an attribute C that was an +C but is now a simple C. + +You can pass a C coderef which will be called with the raw doc +as its first parameter: + + $index->reindex( + domain => 'myapp_v1', + repoint_uids => 1, + transform => sub { + my $doc = shift; + $doc->{_source}{foo} = $doc->{source}{foo}[0]; + } + ); + +=head1 REINDXING MULTIPLE INDICES OR PARTIAL INDICES + +Instead of passing the C parameter, you can pass a +L which gives you the flexbility to combine +multiple indices into one, or to move part of an index into a separate +index. For instance: + + # combine multiple indices + my $view = $model->view( domain => ['index_1','index_2']); + $index->reindex( $view ); + + # reindex part of an index + my $view = $model->view( domain => 'index_1', type => 'big_type' ); + $index->reindex( $view ); + +B the second example (separating out part of an index) can be tricky. +By default, the L +performs its magic on B UID that includes the old index name. +However, this may not always be what you want. + +For a custom requirement such as this, the C coderef is called +with a second parameter, which acts as a flag. By setting this to C, +you can prevent the automatic remapper from working: + + $index->reindex( + view => $view, + transform => sub { + my ($doc) = @_; + $_[1] = 1; # Don't remap UIDs automatically + handle_remapping($doc); # I'll do it myself + } + ); + +=head1 TODO + +=over + +=item * + +Reindex in parallel + +=item * + +Reindex a live index + +=item * + +Keep two indices in sync + +=back diff --git a/lib/Elastic/Model/Index.pm b/lib/Elastic/Model/Index.pm index e00a77b..c1a6de7 100644 --- a/lib/Elastic/Model/Index.pm +++ b/lib/Elastic/Model/Index.pm @@ -18,6 +18,218 @@ sub create { return $self; } +#=================================== +sub reindex { +#=================================== + my $self = shift; + my %args + = @_ != 1 ? @_ + : !ref $_[0] ? ( domain => shift() ) + : ref $_[0] eq 'HASH' ? %{ shift() } + : ( view => shift() ); + + my $verbose = !$args{quiet}; + my $view = $args{view}; + my $scan = $args{scan} || '2m'; + my $size = $args{size} || 1000; + + unless ($view) { + my $domain = $args{domain} + or croak "No (view) or (domain) passed to reindex()"; + $view = $self->model->view( domain => $domain ); + } + + $view = $view->size($size) + unless $view->_has_size; + + # if view has a filter already, then combine it with the query + # before setting the new filter + $view = $view->query( $view->_build_query ) + if $view->filter; + + unless ( $self->exists ) { + print "Creating index (" . $self->name . ")\n" + if $verbose; + $self->create(); + } + + my $transform = $args{transform}; + my $index_map = $self->_index_names( $view, $args{index_map} ); + if ( my $used = $self->_used_index_names( $view, $index_map ) ) { + if ($verbose) { + print join "\n", "Remapping UID indices:", + map { " $_ -> " . $used->{$_} } + sort keys %$used; + print "\n"; + } + $transform = $self->uid_updater( $used, $transform ); + } + + $self->model->es->reindex( + dest_index => $self->name, + source => $view->scan($scan)->as_elements, + bulk_size => $view->size, + quiet => !$verbose, + transform => $transform, + map { $_ => $args{$_} } qw(on_conflict on_error), + ); + + my $repoint = $args{repoint_uids} + or return; + + $self->repoint_uids( + index_map => $index_map, + quiet => $args{quiet}, + scan => $scan, + size => $size, + view => ref $repoint ? $repoint : undef, + map { $_ => $args{"uid_$_"} } qw(on_conflict on_error), + ); + +} + +#=================================== +sub repoint_uids { +#=================================== + my ( $self, %args ) = @_; + + my $map = $args{index_map} + or croak "No (index_map) passed to repoint_uids"; + + my %exclude = ( $self->name => 1, %$map ); + my $verbose = !$args{quiet}; + my $scan = $args{scan} || '2m'; + my $view = $args{view} || $self->model->view; + my $size = $args{size} || 1000; + + $view = $view->query( $view->_build_query ) + if $view->filter; + + $view = $view->size($size) unless $view->_has_size; + $view = $view->filterb( 'uid.index' => [ keys %$map ] ); + + my @indices + = grep { !$exclude{$_} } $self->_values_in_field( $view, '_index' ); + + print "\nRepointing UIDs to use index " . $self->name . "\n" + if $verbose; + + unless (@indices) { + print "No indices to update\n" if $verbose; + return; + } + + $view = $view->domain(@indices); + + my $transform = $self->uid_updater( $map, sub { $_[0]->{_version}++ } ); + + for my $index (@indices) { + print "Updating index: $index\n" if $verbose; + $self->model->es->reindex( + dest_index => $index, + source => $view->domain($index)->scan($scan)->as_elements, + bulk_size => $size, + quiet => !$verbose, + transform => $transform, + map { $_ => $args{$_} } qw(on_conflict on_error), + ); + } + + print "Finished repointing UIDs\n" if $verbose; +} + +#=================================== +sub _index_names { +#=================================== + my ( $self, $view, $init ) = @_; + + my $index_name = $self->name; + my %map = %$init if $init; + + $view = $view->size(0); + + # all indices involved in the source + $map{$_} ||= $index_name for $self->_values_in_field( $view, '_index' ); + + return \%map; + +} + +#=================================== +sub _used_index_names { +#=================================== + my ( $self, $view, $all ) = @_; + + $view = $view->size(0); + + # any uses of these indices in uid.index + $view = $view->filterb( 'uid.index' => [ keys %$all ] ); + + my %map = map { $_ => $all->{$_} } + $self->_values_in_field( $view, 'uid.index' ); + + return keys %map ? \%map : undef; +} + +#=================================== +sub uid_updater { +#=================================== + my ( $self, $map, $transform ) = @_; + my $mapper = sub { + my $doc = shift; + my @stack = values %{ $doc->{_source} }; + + while ( my $val = shift @stack ) { + unless ( ref $val eq 'HASH' ) { + push @stack, @$val if ref $val eq 'ARRAY'; + next; + } + my ( $uid, $index ); + + if ( $uid = $val->{uid} + and ref $uid eq 'HASH' + and $index = $uid->{index} + and $uid->{type} ) + { + if ( my $new = $map->{$index} ) { + $uid->{index} = $new; + } + } + else { + push @stack, values %$val; + } + } + return $doc; + }; + + return $mapper unless $transform; + + return sub { + my $no_remap = 0; + $transform->( @_, $no_remap ); + $mapper->(@_) unless $no_remap; + }; + +} + +#=================================== +sub _values_in_field { +#=================================== + my ( $self, $view, $field, $size ) = @_; + $size ||= 100; + + my $facet + = $view->facets( + field => { terms => { field => $field, size => $size } } ) + ->search->facet('field'); + + if ( my $missing = $facet->{missing} ) { + return $self->view( $field, $size + $missing ); + } + + return map { $_->{term} } @{ $facet->{terms} }; +} + 1; __END__ @@ -31,6 +243,13 @@ __END__ $index->create( settings => \%settings ); + $index->reindex( 'old_index' ); + + $index->reindex( + domain => 'old_index', + repoint_uids => 1 + ); + See also L. =head1 DESCRIPTION @@ -67,6 +286,250 @@ L, pass in a list of C<@types>. $index->create( types => ['user','post' ]); +=head2 reindex() + + # reindex $domain_name to $index->name + $index->reindex( $domain_name ); + + # reindex the data returned by $view to $index->name + $index->reindex( $view ); + + # more options + $index->reindex( + domain => $domain, + OR view => $view, + + size => 1000, + repoint_uids => 1 | $other_view + transform => sub {...}, + scan => '2m', + quiet => 0, + + on_conflict => sub {...} | 'IGNORE' + on_error => sub {...} | 'IGNORE' + uid_on_conflict => sub {...} | 'IGNORE' + uid_on_error => sub {...} | 'IGNORE' + ); + +While you can add to the L of +an index, you can't change what is already there. Especially during development, +you will need to reindex your data to a new index. + +L reindexes your data from L +C<$domain_name> (or the results returned by L C<$view>) +into an index called C<< $index->name >>. The new index is created if it +doesn't already exist. + +See L for more about reindexing strategies. The +documentation below explains what each parameter does: + +=over + +=item size + +The C parameter defaults to 1,000. It has two effects: it controls +how many documents are pulled from the C or C, and how many +documents are batched together to index into the new index. + +You can control the first separately by setting a +L on the C: + + $index->reindex( + size => 200 # index 200 at a time + view => $model->view( + domain => 'myapp_v1', + size => 100 # pull max of 100 * primary_shards + ), + ); + +B documents are pulled from the C/C using +L, which can pull a maximum of +L C<* number_of_primary_shards> in a single +request. If you have large docs or underpowered servers, you may want to +change the L parameter. + +=item scan + +C is the same as L - it controls how long +ElasticSearch should keep the "scroll" live between requests. Defaults to +'2m'. Increase this if the reindexing process is slow and you get +scroll timeouts. + +=item repoint_uids + +If true, L will be called automatically to update any +L (which point at the old index) in indices other +than the ones currently being reindexed. + + $index->reindex( + domain => 'myapp_v1', + repoint_uids => 1 # updates UIDs in any other_index which + # point to 'myapp_v1' + ); + +For more advanced control, you can pass a L: + + my $repoint = $model->view->filterb(...some subset of documents...); + $index->reindex( + domain => 'myapp_v1', + repoint_uids => $repoint # updates UIDs just in $repoint which + # point to 'myapp_v1' + ); + +=item transform + +C accepts a coderef which is called before indexing each doc. +You can use it to make structural changes to the doc, for instance, changing +attribute C from a C to a C: + + $index->reindex( + domain => 'myapp_v1', + transform => sub { + my ($doc) = @_; + $doc->{_source}{foo} = $doc->{_source}{foo}[0] + } + ); + +You can also disable the automatic UID remapper with the second parameter: + + $index->reindex( + domain => 'myapp_v1', + transform => sub { + my ($doc) = @_; + $_[1] = 1; # disable UID remapper + $doc->{_source}{foo} = $doc->{_source}{foo}[0] + } + ); + +=item on_conflict / on_error + +If you are indexing to the new index at the same time as you are reindexing, +you may get document conflicts. You can handle the conflicts with a coderef +callback, or ignore them by by setting C to C<'IGNORE'>: + + $index->reindex( + domain => 'myapp_v2', + on_conflict => 'IGNORE', + ); + +Similarly, you can pass an C handler which will handle other errors, +or all errors if no C handler is defined. + +See L for more. + +=item uid_on_conflict / uid_on_error + +These work in the same way as the C or C handlers, +but are passed to L if C is true. + +=item quiet + +By default, L prints out progress information. To silence this, +set C to true: + + $index->reindex( + domain => 'myapp_v2', + quiet => 1 + ); + +=back + +=head2 repoint_uids() + + $index->repoint_uids( + index_map => \%index_map, + view => $view, + scan => '2m', + size => 1000, + quiet => 0, + + on_conflict => sub {...} | 'IGNORE' + on_error => sub {...} | 'IGNORE' + ) + +The purpose of L is to update L to +point to an old index which has been reindexed. +Normally, it would be called automatically from L. +However, for more fine-grained control, you can call L +yourself. + +Parameters: + +=over + +=item index_map + +This is a required parameter, and maps old index names to new. For instance: + + $index->repoint_uids( + index_map => { + old_index_1 => 'new_index', + old_index_2 => 'new_index', + } + ); + +=item view + +Normally, all L known to the +L will be updated. However, if you +want to restrict which docs are updated, you can pass in a +L instead. + + $index->repoint_uids( + index_map => \%index_map, + view => $model->view->filterb(....) + ); + +=item size + +This is the same as the C parameter to L. + +=item scan + +This is the same as the C parameter to L. + +=item quiet + +This is the same as the C parameter to L. + +=item on_conflict / on_error + +These are the same as the C and C handlers +in L. + +=back + +=head2 uid_updater() + + $coderef = $index->uid_updater(\%map); + $coderef = $index->uid_updater(\%map,$transform); + +L is used by L and L to update +the C value of any L to point to a new index. +It accepts a C<\%map> of index names, eg: + + { + old_index_1 => 'new_index', + old_index_2 => 'new_index' + } + +It accepts a second optional C<$transform> parameter, which should be +a coderef. C<$transform> (if passed) will be called (before the UID updater) +for each doc in the reindexing process, with the raw ElasticSearch doc as its +first argument. + +The second argument is used as a flag for disabling the the automatic UID +remapper: + + $coderef = $index->uid_updater( + \%index_map, + sub { + my ($doc) = @_; + $_[1] = 1; # disable UID remapper + + } + ); + =head1 IMPORTED ATTRIBUTES Attributes imported from L diff --git a/lib/Elastic/Model/Role/Model.pm b/lib/Elastic/Model/Role/Model.pm index bf18a14..c491e77 100644 --- a/lib/Elastic/Model/Role/Model.pm +++ b/lib/Elastic/Model/Role/Model.pm @@ -3,7 +3,7 @@ package Elastic::Model::Role::Model; use Moose::Role; use Carp; use Elastic::Model::Types qw(ES); -use ElasticSearch 0.54 (); +use ElasticSearch 0.55 (); use Class::Load qw(load_class); use Moose::Util qw(does_role); use MooseX::Types::Moose qw(:all); diff --git a/lib/Elastic/Model/View.pm b/lib/Elastic/Model/View.pm index c1945d0..1c7a9c3 100644 --- a/lib/Elastic/Model/View.pm +++ b/lib/Elastic/Model/View.pm @@ -83,9 +83,11 @@ has 'from' => ( #=================================== has 'size' => ( #=================================== - isa => Int, - is => 'rw', - default => 10, + isa => Int, + is => 'rw', + lazy => 1, + default => 10, + predicate => '_has_size', ); #===================================