Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions src/api.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# functionality as direclty as possible, but in a way that is natural for the
# Julia language.

#using Infiltrator
import JSON3
import Arrow

Expand Down Expand Up @@ -354,9 +355,6 @@ end
# waits .. consider creating two entry points for readonly and readwrite.

function exec_v2(ctx::Context, database::AbstractString, engine::AbstractString, source; inputs = nothing, readonly = false, kw...)
if inputs !== nothing
@error "inputs= is not yet supported in the v2 API. For now, please use `exec_v1(...)` instead."
end
source isa IO && (source = read(source, String))
tx_body = Dict(
"dbname" => database,
Expand All @@ -366,6 +364,9 @@ function exec_v2(ctx::Context, database::AbstractString, engine::AbstractString,
"readonly" => readonly,
# "sync_mode" => "async"
)
if inputs !== nothing
tx_body["v1_inputs"] = [_make_query_action_input(k, v) for (k, v) in inputs]
end
body = JSON3.write(tx_body)
path = _mkurl(ctx, PATH_ASYNC_TRANSACTIONS)
rsp = @mock request(ctx, "POST", path; body = body, kw...)
Expand Down Expand Up @@ -398,6 +399,12 @@ function get_transaction_metadata(ctx::Context, id::AbstractString; kw...)
return rsp
end

function get_transaction_problems(ctx::Context, id::AbstractString; kw...)
path = PATH_ASYNC_TRANSACTIONS * "/$id/problems"
rsp = _get(ctx, path; kw...)
return rsp
end

function get_transaction_results(ctx::Context, id::AbstractString; kw...)
path = PATH_ASYNC_TRANSACTIONS * "/$id/results"
path = _mkurl(ctx, path)
Expand All @@ -420,12 +427,17 @@ function _parse_multipart_fastpath_sync_response(msg)
@assert parts[1].name == "transaction"
@assert parts[2].name == "metadata"

results_and_problems = _extract_multipart_results_response(parts)
problems_idx = findfirst(p->p.name == "problems", parts)
problems = JSON3.read(parts[problems_idx])

results_start_idx = findfirst(p->startswith(p.name, '/'), parts)
results = _extract_multipart_results_response(@view(parts[results_start_idx:end]))

return (
transaction = JSON3.read(parts[1]),
metadata = JSON3.read(parts[2]),
results_and_problems...,
problems = problems,
results = results,
)
end

Expand All @@ -436,13 +448,9 @@ function _parse_multipart_results_response(msg)
return _extract_multipart_results_response(parts)
end
function _extract_multipart_results_response(parts)
problems_idx = findfirst(p->p.name == "problems", parts)
results_start_idx = findfirst(p->startswith(p.name, '/'), parts)

return (
problems = JSON3.read(parts[problems_idx]),
relations = [ResultPhysicalRelation(part.name, Arrow.Table(part.data)) for part in @view parts[results_start_idx:end]],
)
return [
ResultPhysicalRelation(part.name, Arrow.Table(part.data)) for part in parts
]
end


Expand Down Expand Up @@ -545,17 +553,23 @@ end


# --- utils -------------------------
# TODO: Delete this once https://github.com/JuliaWeb/HTTP.jl/issues/816 is addressed:
function _parse_multipart_form(msg::HTTP.Message)
# parse boundary from Content-Type
m = match(r"multipart/form-data; boundary=(.*)$", msg["Content-Type"])
m === nothing && return nothing

boundary_delimiter = m[1]

# [RFC2046 5.1.1](https://tools.ietf.org/html/rfc2046#section-5.1.1)
length(boundary_delimiter) > 70 && error("boundary delimiter must not be greater than 70 characters")

return HTTP.MultiPartParsing.parse_multipart_body(HTTP.payload(msg), boundary_delimiter)
# Patch for older versions of HTTP package that don't support parsing multipart responses:
if hasmethod(HTTP.MultiPartParsing.parse_multipart_form, (HTTP.Response,))
# Available as of HTTP v0.9.18:
_parse_multipart_form = HTTP.MultiPartParsing.parse_multipart_form
else
# This function is copied directly from this PR: https://github.com/JuliaWeb/HTTP.jl/pull/817
function _parse_multipart_form(msg::HTTP.Message)
# parse boundary from Content-Type
m = match(r"multipart/form-data; boundary=(.*)$", msg["Content-Type"])
m === nothing && return nothing

boundary_delimiter = m[1]

# [RFC2046 5.1.1](https://tools.ietf.org/html/rfc2046#section-5.1.1)
length(boundary_delimiter) > 70 && error("boundary delimiter must not be greater than 70 characters")

return HTTP.MultiPartParsing.parse_multipart_body(HTTP.payload(msg), boundary_delimiter)
end
end
# -----------------------------------
6 changes: 3 additions & 3 deletions test/api.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ end
@test rsp.problems == Union{}[]

# Arrow.Tables can't be compared via == (https://github.com/apache/arrow-julia/issues/310)
@test length(rsp.relations) == 1
@test rsp.relations[1].name == "/:output/Int64"
@test collect(rsp.relations[1].data) == collect(data)
@test length(rsp.results) == 1
@test rsp.results[1].name == "/:output/Int64"
@test collect(rsp.results[1].data) == collect(data)
end
end
end