Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ElasticsearchClient"
uuid = "e586a49d-aa29-4ce5-8f91-fa4f824367bd"
authors = ["Egor Shmorgun <egor.shmorgun@opensesame.com>"]
version = "0.2.9"
version = "0.2.10"

[deps]
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
Expand All @@ -24,6 +24,7 @@ julia = "1"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"

[targets]
test = ["Test"]
test = ["Test", "JSON3"]
20 changes: 17 additions & 3 deletions src/elastic_transport/client.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using HTTP
using URIs
using Mocking
using JSON

const DEFAULT_PORT = 9200
const DEFAULT_PROTOCOL = "http"
Expand All @@ -19,8 +20,10 @@ Create a client connected to an Elastic cluster.

# Possible arguments

- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default.
- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default.
- `hosts`: Single host passed as a String, Dict or NamedTuple, or multiple hosts passed as an Array; `host`, `url`, `urls` keys are also valid
- `serializer`: Function to serialise the body to JSON. JSON.json by default
- `deserializer`: Function to deserialise response body to Dict. JSON.parse by default
- `resurrect_after::Integer`: After how many seconds a dead connection should be tried again
- `reload_connection::Bool`: Reload connections after X requests (false by default)
- `randomize_hosts::Bool`: Shuffle connections on initialization and reload (false by default)
Expand All @@ -45,7 +48,12 @@ mutable struct Client
transport::Transport
end

function Client(;http_client::Module=HTTP, kwargs...)
function Client(;
http_client::Module=HTTP,
serializer::Function=JSON.json,
deserializer::Function=JSON.parse,
kwargs...
)
arguments = Dict{Symbol, Any}(kwargs)
options = deepcopy(arguments)

Expand All @@ -71,7 +79,13 @@ function Client(;http_client::Module=HTTP, kwargs...)
options[:transport_options][:request] = Dict(:timeout => arguments[:request_timeout])
end

transport = Transport(; hosts=hosts, options=options, http_client=http_client)
transport = Transport(;
hosts=hosts,
options=options,
http_client=http_client,
serializer=serializer,
deserializer=deserializer
)

Client(
arguments,
Expand Down
14 changes: 9 additions & 5 deletions src/elastic_transport/transport/transport.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ mutable struct Transport
retry_on_status::Vector{Integer}
verbose::Integer
http_client::Module
serializer::Function
deserializer::Function
end

function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP)
function Transport(; hosts::Vector=[], options=Dict(), http_client::Module, serializer::Function, deserializer::Function)
!haskey(options, :http) && (options[:http] = Dict())
!haskey(options, :retry_on_status) && (options[:retry_on_status] = Integer[])
!haskey(options, :delay_on_retry) && (options[:delay_on_retry] = 0)
Expand All @@ -49,7 +51,9 @@ function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP)
get(options, :resurrect_after, DEFAULT_RESURRECT_AFTER),
options[:retry_on_status],
options[:verbose],
http_client
http_client,
serializer,
deserializer
)
end

Expand Down Expand Up @@ -173,7 +177,7 @@ function perform_request(

headers = Connections.parse_headers(connection, headers)
if !isnothing(body) && !isa(body, String)
body = JSON.json(body)
body = transport.serializer(body)
end
body, headers = compress_request(transport, body, headers)

Expand Down Expand Up @@ -247,7 +251,7 @@ function perform_request(
end

if !isempty(response_body) && !isnothing(response_content_type) && !isnothing(match(r"json"i, response_content_type))
json = JSON.parse(response_body)
json = transport.deserializer(response_body)
took = if json isa Dict
get(json, "took", "n/a")
end
Expand Down Expand Up @@ -302,6 +306,6 @@ function log_response(method, body, url, response_status, response_body, took, d
message_level,
verbose
)
!isnothing(body) && @debug "> $(JSON.json(body))"
!isnothing(body) && log_message("> $body", Logging.Debug, verbose)
log_message("< $(String(response_body))", Logging.Debug, verbose)
end
34 changes: 29 additions & 5 deletions test/elastic_transport_test/transport_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ using Test
using Mocking
using HTTP
using JSON
using JSON3

Mocking.activate()

serializer = JSON.json
deserializer = JSON.parse

hosts = [
Dict{Symbol, Any}(:host => "localhost", :schema => "https"),
Dict{Symbol, Any}(:host => "127.0.0.1", :schema => "http", :port => 9250),
Expand Down Expand Up @@ -102,15 +106,15 @@ nodes_response_mock = HTTP.Response(

@testset "Transport test" begin
@testset "Transport initialization" begin
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options)
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options, http_client=HTTP, serializer=serializer, deserializer=deserializer)

@test length(transport.connections.connections) == length(hosts)
@test transport.use_compression == options[:compression]
@test transport.retry_on_status == options[:retry_on_status]
end

@testset "Performing request" begin
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)

@testset "Testing with successful response" begin
@testset "Testing GET request with params" begin
Expand Down Expand Up @@ -223,14 +227,34 @@ nodes_response_mock = HTTP.Response(
@test length(ElasticsearchClient.ElasticTransport.Connections.dead(transport.connections)) == 1
end
end

@testset "Testing GET request with custom serializer/deserializer" begin
http_patch = @patch HTTP.request(args...;kwargs...) = successful_health_response_mock
custom_transport = ElasticsearchClient.ElasticTransport.Transport(;
hosts,
options=options,
http_client=HTTP,
serializer=JSON3.write,
deserializer=JSON3.read
)


apply(http_patch) do
response = ElasticsearchClient.ElasticTransport.perform_request(custom_transport, "GET", "/_cluster/health"; params = Dict("pretty" => true))

@test response isa HTTP.Response
@test response.status == 200
@test haskey(response.body, :cluster_name)
end
end
end
end

@testset "Testing sniffing" begin
@testset "Testing successful sniffing" begin
http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock

transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)

apply(http_patch) do
hosts = ElasticsearchClient.ElasticTransport.sniff_hosts(transport) |>
Expand All @@ -256,7 +280,7 @@ nodes_response_mock = HTTP.Response(
@testset "Testing sniffing timeout" begin
http_patch = @patch HTTP.request(args...;kwargs...) = sleep(ElasticsearchClient.ElasticTransport.DEFAULT_SNIFFING_TIMEOUT + 0.5)

transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)

apply(http_patch) do
@test_throws ElasticsearchClient.ElasticTransport.SniffingTimetoutError ElasticsearchClient.ElasticTransport.sniff_hosts(transport)
Expand All @@ -267,7 +291,7 @@ nodes_response_mock = HTTP.Response(
@testset "Testing reload connections" begin
http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock

transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)

apply(http_patch) do
ElasticsearchClient.ElasticTransport.reload_connections!(transport)
Expand Down