Skip to content

Commit

Permalink
Merge 12feb95 into 3d4625a
Browse files Browse the repository at this point in the history
  • Loading branch information
burak committed May 29, 2018
2 parents 3d4625a + 12feb95 commit 0773b5f
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 38 deletions.
47 changes: 41 additions & 6 deletions lib/Elastijk.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,45 @@ use strict;
use warnings;
our $VERSION = "0.12";

use constant DEBUG => $ENV{ELASTIJK_DEBUG} ? 1 : 0;

use JSON ();
use URI::Escape qw(uri_escape_utf8);
use Hijk;
use Elastijk::Constants qw( ESV_PARAM );

our $JSON = JSON->new->utf8;

sub _build_hijk_request_args {
my $args = $_[0];
my $is_vcheck = delete $args->{ ESV_PARAM() };

my ($path, $qs, $uri_param);
$path = "/". join("/", (map { defined($_) ? ( uri_escape_utf8($_) ) : () } @{$args}{qw(index type id)}), (exists $args->{command} ? $args->{command} : ()));
$path = "/" . join( "/",
(
map { defined($_) ? ( uri_escape_utf8($_) ) : () }
map { $args->{ $_ } }
qw( index type id )
),
( exists $args->{command} ? $args->{command} : () )
);

if ($args->{uri_param}) {
$qs = join('&', map { uri_escape_utf8($_) . "=" . uri_escape_utf8($args->{uri_param}{$_}) } keys %{$args->{uri_param}});
}
return {

my $rv = {
method => $args->{method} || 'GET',
host => $args->{host} || 'localhost',
port => $args->{port} || '9200',
path => $path,
( $is_vcheck ? () : (
path => $path,
)),
$qs?( query_string => $qs) :(),
(map { (exists $args->{$_})?( $_ => $args->{$_} ) :() } qw(connect_timeout read_timeout head body socket_cache on_connect)),
}
};

return $rv;
}

sub request {
Expand All @@ -33,14 +51,31 @@ sub request {
$arg->{body} = $JSON->encode( $arg->{body} );
}
my ($status, $res_body) = request_raw($arg);
$res_body = $res_body ? eval { $JSON->decode($res_body) } : undef;

if ( $res_body && ! ref $res_body ) {
eval {
$res_body = $JSON->decode($res_body);
1;
} or do {
my $eval_error = $@ || 'Zombie error';
DEBUG && warn "Error decoding JSON: " . $eval_error;
};
}

return $status, $res_body;
}

sub request_raw {
my $args = _build_hijk_request_args($_[0]);
my $res = Hijk::request($args);
return (exists $res->{error}) ? (0, '{"error":1,"hijk_error":'.$res->{error}.'}') : ($res->{status}, $res->{body});

my $not_ok = $res->{status} && $res->{status} =~ m{ \A [^2] }xms;

return
exists $res->{error} ? ( 0, { error => 1, hijk_error => $res->{error} } )
: ( $not_ok && $res->{body} ) ? ( $res->{status}, { error => 1, hijk_error => $res->{body} } )
: ( $res->{status}, $res->{body} )
;
}

sub new {
Expand Down
17 changes: 17 additions & 0 deletions lib/Elastijk/Constants.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package Elastijk::Constants;
use strict;
use warnings;
use base qw( Exporter );

use Elastijk;

use constant {
# needs to be some unique identifier
ESV_PARAM => '____Elastijk::Elasticsearch::VERSION::Query____',
};

our @EXPORT_OK = qw(
ESV_PARAM
);

1;
76 changes: 71 additions & 5 deletions lib/Elastijk/oo.pm
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package Elastijk::oo;
use strict;
use warnings;
use Elastijk;
use Elastijk::Constants qw( ESV_PARAM );

sub new {
my $class = shift;
return bless { host => "localhost", port => "9200", @_ }, $class;
my $self = bless {
host => "localhost",
port => "9200",
@_,
}, $class;
return $self;
}

{
Expand All @@ -15,7 +21,34 @@ sub new {

sub __fleshen_request_args {
my ($self, $args) = @_;
$args->{$_} ||= $self->{$_} for grep { exists $self->{$_} } qw(host port index type head socket_cache on_connect connect_timeout read_timeout);

# Do NOT delete in here, it needs to stay in args as
# Elastijk::_build_hijk_request_args will use it as well
#
my $is_vcheck = $args->{ ESV_PARAM() };

my @known_meta = qw(
index
);
my @known = qw(
connect_timeout
head
host
on_connect
port
read_timeout
socket_cache
type
);

# so that the version check won't result
# with an error upon non-existing index.
push @known, @known_meta if ! $is_vcheck;

$args->{$_} ||= $self->{$_}
for grep { exists $self->{$_} } @known;

return;
}

sub request {
Expand Down Expand Up @@ -91,7 +124,7 @@ sub scan_scroll {
my $on_response_callback = delete $args{on_response};

my %uri_param = %{ delete($args{uri_param}) || {} };
$uri_param{search_type} = "scan";
$uri_param{search_type} = "scan" if $self->elasticsearch_version(major_only => 1) < 5;
$uri_param{scroll} ||= "10m";
my $scroll_id;
my ($status, $res) = $self->get(%args, command => "_search", uri_param => \%uri_param);
Expand All @@ -102,8 +135,12 @@ sub scan_scroll {
while (1) {
$scroll_id = $res->{_scroll_id};
($status,$res) = $self->get(
index => "_search", type => "scroll", #WTF
uri_param => { scroll => $uri_param{scroll}, scroll_id => $scroll_id }
index => "_search",
type => "scroll", #WTF
uri_param => {
scroll => $uri_param{scroll},
scroll_id => $scroll_id,
},
);
if (substr($status,0,1) eq '2' && @{$res->{hits}{hits}} > 0) {
my $r = $on_response_callback->($status, $res);
Expand All @@ -115,4 +152,33 @@ sub scan_scroll {
}
}

sub elasticsearch_version {
my ($self, %args) = @_;

if ( ! $self->{current_es_version} ) {
my ($status, $response) = $self->get( ESV_PARAM() => 1 );

if ( $status !~ m{ \A 2 }xms ) {
warn sprintf "Got an error response from ElasticSearch(%s): %s",
$status,
$Elastijk::JSON->encode( $response );
} else {
if ( $response && ref $response eq 'HASH' ) {
$self->{current_es_version} = $response->{version}{number};
}

warn "Could not get ElasticSearch version number"
if ! $self->{current_es_version};
}
}

my $version = $self->{current_es_version};
my $rv = defined $version
? $args{major_only} ? (split m{ [.] }xms, $version)[0]
: $version
: undef;

return $rv;
}

1;
4 changes: 2 additions & 2 deletions t/build-hijk-request-args.t
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use Test::More;
use Elastijk;

my @base_arg = (
host => "127.0.0.1",
port => "9200",
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
method => "GET",
);
my @tests = (
Expand Down
16 changes: 11 additions & 5 deletions t/live-index-create-delete.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ unless ($ENV{TEST_LIVE}) {
plan skip_all => "Set env TEST_LIVE=1 to run this test."
}

my @base_arg = (
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
);

my ($status, $res);
my $test_index_name = "test_$$";
my $test_index_name = "elastijk_unit_test_index_$$".rand();

## index exists
($status, $res) = Elastijk::request({ method => "HEAD", index => $test_index_name });
($status, $res) = Elastijk::request({ @base_arg, method => "HEAD", index => $test_index_name });
is $status, "404", "$test_index_name missing";

## index creation
($status, $res) = Elastijk::request({
@base_arg,
method => "PUT",
index => $test_index_name,
body => {
Expand All @@ -34,18 +40,18 @@ is $status, "200";
ok( ($res->{ok} || $res->{acknowledged}) , encode_json($res)) if $res;

## index exists
($status, $res) = Elastijk::request({ method => "HEAD", index => $test_index_name });
($status, $res) = Elastijk::request({ @base_arg, method => "HEAD", index => $test_index_name });
is $status, "200", "$test_index_name exists";
# diag encode_json($res);

## delete it.
($status, $res) = Elastijk::request({ method => "DELETE", index => $test_index_name });
($status, $res) = Elastijk::request({ @base_arg, method => "DELETE", index => $test_index_name });
is $status, "200";
ok( ($res->{ok} || $res->{acknowledged}) , encode_json($res)) if $res;
# diag encode_json($res);

## index exists
($status, $res) = Elastijk::request({ method => "HEAD", index => $test_index_name });
($status, $res) = Elastijk::request({ @base_arg, method => "HEAD", index => $test_index_name });
is $status, "404", "$test_index_name missing";

done_testing;
Expand Down
25 changes: 15 additions & 10 deletions t/live-oo-index-and-search.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ unless ($ENV{TEST_LIVE}) {

use Elastijk;

my @base_arg = (
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
);

my $res;
my $test_index_name = "test_index_$$".rand();
my $test_index_name = "elastijk_unit_test_index_$$".rand();

my $es = Elastijk->new(
host => 'localhost',
port => '9200',
@base_arg,
index => $test_index_name,
);


## create the index, and index some documents.
$res = $es->put(
index => $test_index_name,
Expand All @@ -40,7 +43,7 @@ $res = $es->put(
}
}
);
ok $es->exists( index => $test_index_name ), "The newly created index doe exist.";
ok $es->exists( index => $test_index_name ), "The newly created index does exist.";

subtest "index 2 documents" => sub {
my $sources = [{
Expand All @@ -59,10 +62,12 @@ subtest "index 2 documents" => sub {
is ref($res), 'HASH';
is ref($res->{items}), 'ARRAY';

my $es_major = $es->elasticsearch_version(major_only => 1);

for(my $i = 0; $i < @$sources; $i++) {
my $source = $sources->[$i];
my ($action, $res2) = (%{$res->{items}[$i]});
is $action, 'create';
is $action, ( $es_major >= 5 ? 'index' : 'create' );
my $res3 = $es->get( type => "cafe", id => $res2->{_id} );
is_deeply($res3->{_source}, $source);
}
Expand Down Expand Up @@ -110,9 +115,7 @@ subtest "index a single document, then get it." => sub {

subtest "index 2 documents with the value of 'type' attribute in the object." => sub {
my $es = Elastijk->new(
host => 'localhost',
port => '9200',

@base_arg,
index => $test_index_name,
type => "cafe",
);
Expand All @@ -132,10 +135,12 @@ subtest "index 2 documents with the value of 'type' attribute in the object." =>
is ref($res), 'HASH';
is ref($res->{items}), 'ARRAY';

my $es_major = $es->elasticsearch_version(major_only => 1);

for(my $i = 0; $i < @$sources; $i++) {
my $source = $sources->[$i];
my ($action, $res2) = (%{$res->{items}[$i]});
is $action, 'create';
is $action, ( $es_major >= 5 ? 'index' : 'create' );
my $res3 = $es->get( type => "cafe", id => $res2->{_id} );
is_deeply($res3->{_source}, $source);
}
Expand Down
10 changes: 7 additions & 3 deletions t/live-oo-index-create-and-delete.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ unless ($ENV{TEST_LIVE}) {
plan skip_all => "Set env TEST_LIVE=1 to run this test."
}


use Elastijk;

my $es = Elastijk->new( host => "localhost", port => "9200" );
my $test_index_name = "test_index_$$";
my @base_arg = (
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
);

my $es = Elastijk->new( @base_arg );
my $test_index_name = "elastijk_unit_test_index_$$".rand();
my $res;

subtest "create an index with settings and mappings" => sub {
Expand Down
9 changes: 7 additions & 2 deletions t/live-oo-scan-scroll.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ unless ($ENV{TEST_LIVE}) {

use Elastijk;

my $test_index_name = "test_index_$$".rand();
my $es = Elastijk->new(host => 'localhost', port => '9200', index => $test_index_name );
my @base_arg = (
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
);

my $test_index_name = "elastijk_unit_test_index_$$".rand();
my $es = Elastijk->new(@base_arg, index => $test_index_name );
## create the index, and index some documents.
$es->put(
body => {
Expand Down
7 changes: 3 additions & 4 deletions t/live-search-and-stats.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ unless ($ENV{TEST_LIVE}) {
}

my @base_arg = (
host => "127.0.0.1",
port => "9200",
method => "GET",
host => $ENV{TEST_HOST} || 'localhost',
port => $ENV{TEST_PORT} || 9200,
);

my @tests = (
Expand All @@ -27,7 +26,7 @@ my @tests = (
);

for my $req_args (@tests) {
my $args = { @base_arg, %$req_args };
my $args = { @base_arg, method => "GET", %$req_args };
my ($status, $res) = Elastijk::request($args);
is ref($res), 'HASH', substr(JSON::encode_json($res), 0, 60)."...";
}
Expand Down
Loading

0 comments on commit 0773b5f

Please sign in to comment.