From d0eda7c28668ec653252c3b51136542bac9e4e10 Mon Sep 17 00:00:00 2001 From: Egor Shmorgun Date: Fri, 13 Oct 2023 15:33:21 +0300 Subject: [PATCH] add serializer and deserializer as client arguments --- Project.toml | 5 +-- src/elastic_transport/client.jl | 20 +++++++++-- src/elastic_transport/transport/transport.jl | 14 +++++--- test/elastic_transport_test/transport_test.jl | 34 ++++++++++++++++--- 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/Project.toml b/Project.toml index 8d2bd1f..66e5315 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ElasticsearchClient" uuid = "e586a49d-aa29-4ce5-8f91-fa4f824367bd" authors = ["Egor Shmorgun "] -version = "0.2.9" +version = "0.2.10" [deps] CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" @@ -24,6 +24,7 @@ julia = "1" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" [targets] -test = ["Test"] +test = ["Test", "JSON3"] diff --git a/src/elastic_transport/client.jl b/src/elastic_transport/client.jl index e6074da..c3f6e6a 100644 --- a/src/elastic_transport/client.jl +++ b/src/elastic_transport/client.jl @@ -1,6 +1,7 @@ using HTTP using URIs using Mocking +using JSON const DEFAULT_PORT = 9200 const DEFAULT_PROTOCOL = "http" @@ -19,8 +20,10 @@ Create a client connected to an Elastic cluster. # Possible arguments -- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default. +- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default. - `hosts`: Single host passed as a String, Dict or NamedTuple, or multiple hosts passed as an Array; `host`, `url`, `urls` keys are also valid +- `serializer`: Function to serialise the body to JSON. JSON.json by default +- `deserializer`: Function to deserialise response body to Dict. JSON.parse by default - `resurrect_after::Integer`: After how many seconds a dead connection should be tried again - `reload_connection::Bool`: Reload connections after X requests (false by default) - `randomize_hosts::Bool`: Shuffle connections on initialization and reload (false by default) @@ -45,7 +48,12 @@ mutable struct Client transport::Transport end -function Client(;http_client::Module=HTTP, kwargs...) +function Client(; + http_client::Module=HTTP, + serializer::Function=JSON.json, + deserializer::Function=JSON.parse, + kwargs... +) arguments = Dict{Symbol, Any}(kwargs) options = deepcopy(arguments) @@ -71,7 +79,13 @@ function Client(;http_client::Module=HTTP, kwargs...) options[:transport_options][:request] = Dict(:timeout => arguments[:request_timeout]) end - transport = Transport(; hosts=hosts, options=options, http_client=http_client) + transport = Transport(; + hosts=hosts, + options=options, + http_client=http_client, + serializer=serializer, + deserializer=deserializer + ) Client( arguments, diff --git a/src/elastic_transport/transport/transport.jl b/src/elastic_transport/transport/transport.jl index b1a2638..a5c85f8 100644 --- a/src/elastic_transport/transport/transport.jl +++ b/src/elastic_transport/transport/transport.jl @@ -27,9 +27,11 @@ mutable struct Transport retry_on_status::Vector{Integer} verbose::Integer http_client::Module + serializer::Function + deserializer::Function end -function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP) +function Transport(; hosts::Vector=[], options=Dict(), http_client::Module, serializer::Function, deserializer::Function) !haskey(options, :http) && (options[:http] = Dict()) !haskey(options, :retry_on_status) && (options[:retry_on_status] = Integer[]) !haskey(options, :delay_on_retry) && (options[:delay_on_retry] = 0) @@ -49,7 +51,9 @@ function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP) get(options, :resurrect_after, DEFAULT_RESURRECT_AFTER), options[:retry_on_status], options[:verbose], - http_client + http_client, + serializer, + deserializer ) end @@ -173,7 +177,7 @@ function perform_request( headers = Connections.parse_headers(connection, headers) if !isnothing(body) && !isa(body, String) - body = JSON.json(body) + body = transport.serializer(body) end body, headers = compress_request(transport, body, headers) @@ -247,7 +251,7 @@ function perform_request( end if !isempty(response_body) && !isnothing(response_content_type) && !isnothing(match(r"json"i, response_content_type)) - json = JSON.parse(response_body) + json = transport.deserializer(response_body) took = if json isa Dict get(json, "took", "n/a") end @@ -302,6 +306,6 @@ function log_response(method, body, url, response_status, response_body, took, d message_level, verbose ) - !isnothing(body) && @debug "> $(JSON.json(body))" + !isnothing(body) && log_message("> $body", Logging.Debug, verbose) log_message("< $(String(response_body))", Logging.Debug, verbose) end diff --git a/test/elastic_transport_test/transport_test.jl b/test/elastic_transport_test/transport_test.jl index ef3e16b..9ffaa40 100644 --- a/test/elastic_transport_test/transport_test.jl +++ b/test/elastic_transport_test/transport_test.jl @@ -3,9 +3,13 @@ using Test using Mocking using HTTP using JSON +using JSON3 Mocking.activate() +serializer = JSON.json +deserializer = JSON.parse + hosts = [ Dict{Symbol, Any}(:host => "localhost", :schema => "https"), Dict{Symbol, Any}(:host => "127.0.0.1", :schema => "http", :port => 9250), @@ -102,7 +106,7 @@ nodes_response_mock = HTTP.Response( @testset "Transport test" begin @testset "Transport initialization" begin - transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options) + transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options, http_client=HTTP, serializer=serializer, deserializer=deserializer) @test length(transport.connections.connections) == length(hosts) @test transport.use_compression == options[:compression] @@ -110,7 +114,7 @@ nodes_response_mock = HTTP.Response( end @testset "Performing request" begin - transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options) + transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer) @testset "Testing with successful response" begin @testset "Testing GET request with params" begin @@ -223,6 +227,26 @@ nodes_response_mock = HTTP.Response( @test length(ElasticsearchClient.ElasticTransport.Connections.dead(transport.connections)) == 1 end end + + @testset "Testing GET request with custom serializer/deserializer" begin + http_patch = @patch HTTP.request(args...;kwargs...) = successful_health_response_mock + custom_transport = ElasticsearchClient.ElasticTransport.Transport(; + hosts, + options=options, + http_client=HTTP, + serializer=JSON3.write, + deserializer=JSON3.read + ) + + + apply(http_patch) do + response = ElasticsearchClient.ElasticTransport.perform_request(custom_transport, "GET", "/_cluster/health"; params = Dict("pretty" => true)) + + @test response isa HTTP.Response + @test response.status == 200 + @test haskey(response.body, :cluster_name) + end + end end end @@ -230,7 +254,7 @@ nodes_response_mock = HTTP.Response( @testset "Testing successful sniffing" begin http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock - transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options) + transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer) apply(http_patch) do hosts = ElasticsearchClient.ElasticTransport.sniff_hosts(transport) |> @@ -256,7 +280,7 @@ nodes_response_mock = HTTP.Response( @testset "Testing sniffing timeout" begin http_patch = @patch HTTP.request(args...;kwargs...) = sleep(ElasticsearchClient.ElasticTransport.DEFAULT_SNIFFING_TIMEOUT + 0.5) - transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options) + transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer) apply(http_patch) do @test_throws ElasticsearchClient.ElasticTransport.SniffingTimetoutError ElasticsearchClient.ElasticTransport.sniff_hosts(transport) @@ -267,7 +291,7 @@ nodes_response_mock = HTTP.Response( @testset "Testing reload connections" begin http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock - transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options) + transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer) apply(http_patch) do ElasticsearchClient.ElasticTransport.reload_connections!(transport)