Skip to content

Commit

Permalink
feat(client): flux query executing and parsing
Browse files Browse the repository at this point in the history
Add supports for flux query request and the parsing of results into
table structure.
  • Loading branch information
kimburgess committed Feb 13, 2020
1 parent 4212a3d commit 6933d1c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 82 deletions.
3 changes: 0 additions & 3 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ dependencies:
tasker:
github: spider-gazelle/tasker
branch: master
burrito:
github: aca-labs/burrito
branch: master

development_dependencies:
ameba:
Expand Down
51 changes: 51 additions & 0 deletions spec/flux/client_spec.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "json"
require "../spec_helper"

describe Flux::Client do
Expand Down Expand Up @@ -39,4 +40,54 @@ describe Flux::Client do
client.write "test", points
end
end

describe "#query" do
io = IO::Memory.new <<-CSV
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double
#group,false,false,false,false,false,true,false,false
#default,,,,,,,,
,result,table,_start,_stop,_time,region,host,_value
,my-result,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,east,A,15.43
,my-result,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,east,B,59.25
,my-result,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,east,C,52.62
,my-result,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,west,A,62.73
,my-result,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,west,B,12.83
,my-result,1,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,west,C,51.62
CSV

before_each { io.rewind }

WebMock.stub(:post, "http://example.com/api/v2/query")
.with(
headers: {
"Authorization" => "Token abc",
"Accept" => "application/csv",
"Content-type" => "application/vnd.flux"
},
query: {
"org" => "foo",
},
body: {
query: "test"
}.to_json
)
.to_return(io)

it "provides untyped results" do
tables = client.query "test"
tables.first.first["region"].should eq("east")
end

it "provides the ablity to read into a known schema" do
tables = client.query "test" do |row|
{
time: Time::Format::RFC_3339.parse(row["_time"]),
region: row["region"],
host: row["host"],
value: row["_value"].to_f
}
end
tables.first.first[:host].should eq("A")
end
end
end
31 changes: 14 additions & 17 deletions src/flux/buffered_writer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,21 @@ class Flux::BufferedWriter
#
# Error will be retried as appropriate.
private def write(points : Enumerable(Point), retries = 3)
result = client.write bucket, points
case result
when TooManyRequests
client.log.warn result
sleep result.retry_after
write points
when ServerError
retries -= 1
if retries > 0
client.log.warn "#{result}, retrying write request"
write points, retries
else
client.log.error "#{result}, retries exhausted - dropping write request"
end
when Error
client.log.error "#{result}, dropping write request"
client.write bucket, points
client.log.info "#{points.size} points written"
rescue ex : TooManyRequests
client.log.warn ex.message
sleep ex.retry_after
write points
rescue ex : ServerError
retries -= 1
if retries > 0
client.log.warn "#{ex.message}, retrying write request"
write points, retries
else
client.log.info "#{points.size} points written"
client.log.error "#{ex.message}, retries exhausted - dropping write request"
end
rescue ex : Error
client.log.error "#{ex.message}, dropping write request"
end
end
42 changes: 29 additions & 13 deletions src/flux/client.cr
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
require "http/client"
require "logger"
require "uri"
require "./errors"
require "./point"
require "./query_result"

# FIXME: underclared namespaces are infered to by modules by default. Drop this
# when https://github.com/crystal-lang/crystal/issues/8685 is resolved.
class Flux::Client; end
require "./client/*"

class Flux::Client
getter log : Logger

Expand Down Expand Up @@ -74,25 +70,45 @@ class Flux::Client
request.content_length = data.size

response = connection.exec request
check_response! response

# FIXME: currently this is triggering a compiler bug. Re-enable status code
# checked when resolved.
# See: https://github.com/crystal-lang/crystal/issues/7113
# Result.from(response).is(HTTP::Status::NO_CONTENT).value
nil
end

Response.from(response).value
# Runs a query on the connected InfluxDB instance.
#
# *expression* must be a valid Flux expression. All returned records will by a
# Hash of String => String. To parse into stricter types, use variant of this
# method accepting a block.
def query(expression : String)
query expression, &.to_h
end

# Runs a query on the connected InfluxDB instance.
#
# *expression* must be a valid Flux expression.
def query(expression : String)
def query(expression : String, &block : CSV::Row, Array(QueryResult::Column) -> T) forall T
query_internal expression do |io|
QueryResult.parse io, &block
end
end

# Runs a query on the connected InfluxDB instance, returning the result.
private def query_internal(expression : String, &block : IO -> T) forall T
headers = HTTP::Headers.new
headers.add "Accept", "application/csv"
headers.add "Content-type", "application/vnd.flux"

response = connection.post "/query", headers, body
body = { query: expression }.to_json

connection.post "/query", headers, body do |response|
check_response! response
yield response.body_io
end
end

Response.from(response).map(&.body_io).value
# Checks a HTTP response and raises an error if the status was not successful.
private def check_response!(response : HTTP::Client::Response) : Nil
Error.from(response).try { |e| raise e }
end
end
40 changes: 0 additions & 40 deletions src/flux/client/response.cr

This file was deleted.

10 changes: 1 addition & 9 deletions src/flux/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require "http/client/response"

module Flux
# Base class for all error types originating from interaction with InfluxDB.
abstract class Error
abstract class Error < Exception
# Contructs a concrete error object from a client response.
def self.from(response : HTTP::Client::Response) : Error?
message = response.status_message || "HTTP #{response.status_code})"
Expand All @@ -18,14 +18,6 @@ module Flux
ServerError.new message
end
end

getter message : String

def initialize(@message); end

def to_s(io : IO)
io << message
end
end

class ClientError < Error; end
Expand Down

0 comments on commit 6933d1c

Please sign in to comment.