Skip to content

Commit

Permalink
Support new Tables.jl interface (#209)
Browse files Browse the repository at this point in the history
* Support new Tables.jl interface

* Fixes

* Fixes

* Fixes

* Fix CSv

* Fix mysql tests

* Remove call of showall

* Fix postgres tests

* Fix

* Update docs
  • Loading branch information
quinnj committed Oct 20, 2018
1 parent cee7ff0 commit 41b7b6e
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 414 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -23,7 +23,7 @@ julia> Pkg.add("ODBC")

## Project Status

The package is tested against Julia `0.5` on Linux, OS X, and Windows.
The package is tested against Julia `1.0` on Linux, OSX, and Windows.

## Contributing and Questions

Expand Down
7 changes: 3 additions & 4 deletions REQUIRE
@@ -1,7 +1,6 @@
julia 0.7-
DataStreams 0.3.0
DataFrames
julia 0.7
DataFrames 0.14.0
CategoricalArrays 0.2
WeakRefStrings 0.4.1
DecFP 0.3.0
Missings
Tables
55 changes: 19 additions & 36 deletions docs/src/index.md
Expand Up @@ -33,18 +33,13 @@ The second method takes a full connection string. Connection strings are vendor-

The ODBC.jl package ships an experimental REPL mode for convenience in rapid query execution. The REPL mode can be accessed by hitting the `]` character at an empty `julia>` prompt. The prompt will change to `sql>` and SQL queries can be entered directly and executed by pressing `enter`. Since the queries need an `ODBC.DSN` to execute against, the most recently connected `ODBC.DSN` is used automatically, so a valid connection must have been created before entering the `sql>` REPL mode. Query results are shown directly in the REPL, and the prompt will stay in `sql>` mode until `backspace` is pressed at an empty `sql>` prompt. The results of the last query can then be accessed back at the `julia>` prompt via the global `odbcdf` variable.

Methods:

`ODBC.query(dsn::ODBC.DSN, sql::AbstractString, sink=DataFrame, args...; weakrefstrings::Bool=true, append::Bool=false)`

`ODBC.query(dsn::DSN, sql::AbstractString, sink::T; weakrefstrings::Bool=true, append::Bool=false) where T`

`ODBC.query(source::ODBC.Source, sink=DataFrame, args...; append::Bool=false)`
`ODBC.query(dsn::ODBC.DSN, sql::AbstractString)`
`ODBC.Query(dsn::ODBC.DSN, sql::AbstractString) |> DataFrame`
`ODBC.Query(dsn::ODBC.DSN, sql::AbstractString) |> CSV.write("output.csv")`
`ODBC.Query(dsn::ODBC.DSN, sql::AbstractString) |> SQLite.load!(db, table_name)`
`ODBC.Query(dsn::ODBC.DSN, sql::AbstractString) |> Feather.write("output.feather")`

`ODBC.query(source::ODBC.Source, sink::T; append::Bool=false) where T`


`ODBC.query` is a high-level method for sending an SQL statement to a system and returning the results. As is shown, a valid `dsn::ODBC.DSN` and SQL statement `sql` combo can be sent, as well as an already-constructed `source::ODBC.Source`. By default, the results will be returned in a [`DataFrame`](http://juliadata.github.io/DataFrames.jl/latest/), but a variety of options exist for returning results, including `CSV.Sink`, `SQLite.Sink`, or `Feather.Sink`. `ODBC.query` actually utilizes the `DataStreams.jl` framework, so any valid [`Data.Sink`](http://juliadata.github.io/DataStreams.jl/latest/#Data.Sink-Interface-1) can be used to return results. The `append=false` keyword specifies whether the results should be *added to* any existing data in the `Data.Sink`, or if the resultset should fully replace any existing data. The `weakrefstrings` argument indicates whether `WeakRefString`s should be used by default for efficiency.
`ODBC.query` is a high-level method for sending an SQL statement to a system and returning the results. As is shown, a valid `dsn::ODBC.DSN` and SQL statement `sql` combo are the arguments. By default, the results will be returned in a [`DataFrame`](http://juliadata.github.io/DataFrames.jl/latest/), but a variety of options exist for handling results, including `CSV.write`, `SQLite.load!`, or `Feather.write`. `ODBC.Query` executes a query and returns metadata about the return results and satisfies the [Tables.jl](https://github.com/JuliaData/Tables.jl) interface for allowing integration with the numerous other formats.

Examples:

Expand All @@ -56,36 +51,28 @@ df = ODBC.query(dsn, "select * from cool_table")

# return result as a csv file
using CSV
csv = ODBC.query(dsn, "select * from cool_table", CSV.Sink, "cool_table.csv")
csv = ODBC.Query(dsn, "select * from cool_table") |> CSV.write("cool_table.csv")

# return the result directly into a local SQLite table
using SQLite
db = SQLite.DB()

sqlite = ODBC.query(dsn, "select * from cool_table", SQLite.Sink, db, "cool_table_in_sqlite")
sqlite = ODBC.Query(dsn, "select * from cool_table") |> SQLite.load!(db, "cool_table_in_sqlite")

# return the result as a feather-formatted binary file
using Feather
feather = ODBC.query(dsn, "select * from cool_table", Feather.Sink, "cool_table.feather")
feather = ODBC.Query(dsn, "select * from cool_table") |> Feather.write("cool_table.feather")

```

### `ODBC.load`
### `ODBC.load!`

Methods:
`ODBC.load(dsn::DSN, table::AbstractString, ::Type{T}, args...; append::Bool=false) where T`

`ODBC.load(dsn::DSN, table::AbstractString, source; append::Bool=false)`

`ODBC.load(sink::Sink, ::Type{T}, args...; append::Bool=false) where T`

`ODBC.load(sink::Sink, source; append::Bool=false)`

`ODBC.load` is a sister method to `ODBC.query`, but instead of providing a robust way of *returning* results, it allows one to *send* data to a DB.
`ODBC.load!(table, dsn::DSN, tablename::AbstractString)`

**Please note this is currently experimental and ODBC driver-dependent; meaning, an ODBC driver must impelement certain low-level API methods to enable this feature. This is not a limitation of ODBC.jl itself, but the ODBC driver provided by the vendor. In the case this method doesn't work for loading data, please see the documentation around prepared statements.**

`ODBC.load` takes a valid DB connection `dsn` and the name of an *existing* table `table` to which to send data. Note that on-the-fly creation of a table is not currently supported. The data to send can be any valid [`Data.Source`](http://juliadata.github.io/DataStreams.jl/latest/#Data.Source-Interface-1) object, from the `DataStreams.jl` framework, including a `DataFrame`, `CSV.Source`, `SQLite.Source`, `Feather.Source`, etc.
`ODBC.load!` takes a valid DB connection `dsn` and the name of an *existing* table `tablename` to which to send data. Note that on-the-fly creation of a table is not currently supported. The data to send can be any valid [`Tables.jl`](https://github.com/JuliaData/Tables.jl) implementor, from the `Tables.jl` framework, including a `DataFrame`, `CSV.File`, `SQLite.Query`, etc.

Examples:

Expand All @@ -98,22 +85,18 @@ ODBC.execute!(dsn, "CREATE TABLE cool_table (col1 INT, col2 FLOAT, col3 VARCHAR)
# load data from a DataFrame into the table
df = DataFrame(col1=[1,2,3], col2=[4.0, 5.0, 6.0], col3=["hey", "there", "sailor"])

ODBC.load(dsn, "cool_table", df)
ODBC.load!(dsn, "cool_table", df)

# load data from a csv file
using CSV

ODBC.load(dsn, "cool_table", CSV.Source, "cool_table.csv")
ODBC.load!(dsn, "cool_table", CSV.File("cool_table.csv"))

# load data from an SQLite table
using SQLite

ODBC.load(dsn, "cool_table", SQLite.Source, "select * from cool_table")

# load data from a feather-formatted binary file
using Feather

ODBC.load(dsn, "cool_table", Feather.Source, "cool_table.feather")
db = SQLite.DB()
ODBC.load!(dsn, "cool_table", SQLite.Query(db, "select * from cool_table"))

```

Expand Down Expand Up @@ -156,10 +139,10 @@ Methods:
`ODBC.execute!` provides a method for executing a statement against a DB without returning any results. Certain SQL statements known as "DDL" statements are used to modify objects in a DB and don't have results to return anyway. While `ODBC.query` can still be used for these types of statements, `ODBC.execute!` is much more efficient. This method is also used to execute prepared statements, as noted in the documentation for `ODBC.prepare`.


### `ODBC.Source`
### `ODBC.Query`

Constructors:

`ODBC.Source(dsn::ODBC.DSN, querystring::String) => ODBC.Source`
`ODBC.Query(dsn::ODBC.DSN, querystring::String) => ODBC.Query`

`ODBC.Source` is an implementation of a `Data.Source` in the [DataStreams.jl](http://juliadata.github.io/DataStreams.jl/latest/#Data.Source-Interface-1) framework. It takes a valid DB connection `dsn` and executes a properly formatted SQL query string `querystring` and makes preparations for returning a resultset.
`ODBC.Query` is an implementation of the [Tables.jl](https://github.com/JuliaData/Tables.jl) interface. It takes a valid DB connection `dsn` and executes a properly formatted SQL query string `querystring` and makes preparations for returning a resultset.
26 changes: 11 additions & 15 deletions src/API.jl
Expand Up @@ -55,13 +55,9 @@ const RETURN_VALUES = Dict(SQL_ERROR => "SQL_ERROR",

macro odbc(func,args,vals...)
if Sys.iswindows()
esc(quote
ret = ccall( ($func, odbc_dm), stdcall, SQLRETURN, $args, $(vals...))
end)
esc(:(ccall( ($func, $odbc_dm), stdcall, SQLRETURN, $args, $(vals...))))
else
esc(quote
ret = ccall( ($func, odbc_dm), SQLRETURN, $args, $(vals...))
end)
esc(:(ccall( ($func, $odbc_dm), SQLRETURN, $args, $(vals...))))
end
end

Expand Down Expand Up @@ -143,9 +139,9 @@ const SQL_TRUE = 1
const SQL_FALSE = 0

#Status: Tested on Windows, Linux, Mac 32/64-bit
function SQLSetEnvAttr(env_handle::Ptr{Cvoid}, attribute::Int, value::T) where {T<:Union{Int,UInt}}
function SQLSetEnvAttr(env_handle::Ptr{Cvoid}, attribute::Int, value::Integer)
@odbc(:SQLSetEnvAttr,
(Ptr{Cvoid}, Int, T, Int), env_handle, attribute, value, 0)
(Ptr{Cvoid}, Int, UInt, Int), env_handle, attribute, value, 0)
end

# SQLGetEnvAttr
Expand Down Expand Up @@ -247,7 +243,7 @@ const SQL_CD_FALSE = 0
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms710297(v=vs.85).aspx"
function SQLGetConnectAttr(dbc::Ptr{Cvoid},attribute::Int,value::Array{T,N},bytes_returned::Array{Int,1}) where {T,N}
@odbc(:SQLGetConnectAttrW,
(Ptr{Cvoid},Int,Ptr{T},Int,Ptr{Int}),
(Ptr{Cvoid},Int,Ptr{Cvoid},Int,Ptr{Int}),
dbc,attribute,value,sizeof(T)*N,bytes_returned)
end

Expand Down Expand Up @@ -283,7 +279,7 @@ end
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms715438(v=vs.85).aspx"
function SQLGetStmtAttr(stmt::Ptr{Cvoid},attribute::Int,value::Array{T,N},bytes_returned::Array{Int,1}) where {T,N}
@odbc(:SQLGetStmtAttrW,
(Ptr{Cvoid},Int,Ptr{T},Int,Ptr{Int}),
(Ptr{Cvoid},Int,Ptr{Cvoid},Int,Ptr{Int}),
stmt,attribute,value,sizeof(T)*N,bytes_returned)
end

Expand All @@ -308,14 +304,14 @@ end
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms713560(v=vs.85).aspx"
function SQLSetDescField(desc::Ptr{Cvoid},i::Int16,field_id::Int16,value::Array{T,N}) where {T,N}
@odbc(:SQLSetDescFieldW,
(Ptr{Cvoid},Int16,Int16,Ptr{T},Int),
(Ptr{Cvoid},Int16,Int16,Ptr{Cvoid},Int),
desc,i,field_id,value,length(value))
end

"http://msdn.microsoft.com/en-us/library/windows/desktop/ms716370(v=vs.85).aspx"
function SQLGetDescField(desc::Ptr{Cvoid},i::Int16,attribute::Int16,value::Array{T,N},bytes_returned::Array{Int,1}) where {T,N}
@odbc(:SQLGetDescFieldW,
(Ptr{Cvoid},Int16,Int16,Ptr{T},Int,Ptr{Int}),
(Ptr{Cvoid},Int16,Int16,Ptr{Cvoid},Int,Ptr{Int}),
desc,i,attribute,value,sizeof(T)*N,bytes_returned)
end

Expand Down Expand Up @@ -395,7 +391,7 @@ end
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms711681(v=vs.85).aspx"
function SQLGetInfo(dbc::Ptr{Cvoid},attribute::Int,value::Array{T,N},bytes_returned::Array{Int,1}) where {T,N}
@odbc(:SQLGetInfoW,
(Ptr{Cvoid},Int,Ptr{T},Int,Ptr{Int}),
(Ptr{Cvoid},Int,Ptr{Cvoid},Int,Ptr{Int}),
dbc,attribute,value,sizeof(T)*N,bytes_returned)
end

Expand Down Expand Up @@ -425,7 +421,7 @@ end
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms713824(v=vs.85).aspx"
function SQLPutData(stmt::Ptr{Cvoid},data::Array{T},data_length::Int) where {T}
@odbc(:SQLPutData,
(Ptr{Cvoid},Ptr{T},Int),
(Ptr{Cvoid},Ptr{Cvoid},Int),
stmt,data,data_length)
end

Expand Down Expand Up @@ -596,7 +592,7 @@ const SQL_LOCK_UNLOCK = UInt16(2) #SQLSetPos
"http://msdn.microsoft.com/en-us/library/windows/desktop/ms713507(v=vs.85).aspx"
function SQLSetPos(stmt::Ptr{Cvoid},rownumber::T,operation::UInt16,lock_type::UInt16) where {T}
@odbc(:SQLSetPos,
(Ptr{Cvoid},T,UInt16,UInt16),
(Ptr{Cvoid},UInt64,UInt16,UInt16),
stmt,rownumber,operation,lock_type)
end #T can be Uint64 or UInt16 it seems

Expand Down
105 changes: 78 additions & 27 deletions src/ODBC.jl
@@ -1,8 +1,8 @@
module ODBC

using DataStreams, Missings, CategoricalArrays, WeakRefStrings, DataFrames, Dates
using Tables, CategoricalArrays, WeakRefStrings, DataFrames, Dates, DecFP

export Data, DataFrame, odbcdf
export DataFrame, odbcdf

include("API.jl")

Expand Down Expand Up @@ -155,27 +155,86 @@ mutable struct Statement
task::Task
end

"An `ODBC.Source` type executes a `query` string upon construction and prepares data for streaming to an appropriate `Data.Sink`"
mutable struct Source{T} <: Data.Source
schema::Data.Schema
dsn::DSN
query::String
columns::T
status::Int
rowsfetched::Ref{ODBC.API.SQLLEN}
rowoffset::Int
boundcols::Vector{Any}
indcols::Vector{Vector{ODBC.API.SQLLEN}}
sizes::Vector{ODBC.API.SQLULEN}
ctypes::Vector{ODBC.API.SQLSMALLINT}
jltypes::Vector{Type}
supportsreset::Bool
# used to 'clear' a statement of bound columns, resultsets,
# and other bound parameters in preparation for a subsequent query
function ODBCFreeStmt!(stmt)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_CLOSE)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_UNBIND)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_RESET_PARAMS)
end

Base.show(io::IO, source::Source) = print(io, "ODBC.Source:\n\tDSN: $(source.dsn)\n\tstatus: $(source.status)\n\tschema: $(source.schema)")
# "Allocate ODBC handles for interacting with the ODBC Driver Manager"
function ODBCAllocHandle(handletype, parenthandle)
handle = Ref{Ptr{Cvoid}}()
API.SQLAllocHandle(handletype, parenthandle, handle)
handle = handle[]
if handletype == API.SQL_HANDLE_ENV
API.SQLSetEnvAttr(handle, API.SQL_ATTR_ODBC_VERSION, API.SQL_OV_ODBC3)
end
return handle
end

# "Alternative connect function that allows user to create datasources on the fly through opening the ODBC admin"
function ODBCDriverConnect!(dbc::Ptr{Cvoid}, conn_string, prompt::Bool)
@static if Sys.iswindows()
driver_prompt = prompt ? API.SQL_DRIVER_PROMPT : API.SQL_DRIVER_NOPROMPT
window_handle = prompt ? ccall((:GetForegroundWindow, :user32), Ptr{Cvoid}, () ) : C_NULL
else
driver_prompt = API.SQL_DRIVER_NOPROMPT
window_handle = C_NULL
end
out_conn = Block(API.SQLWCHAR, BUFLEN)
out_buff = Ref{Int16}()
@CHECK dbc API.SQL_HANDLE_DBC API.SQLDriverConnect(dbc, window_handle, conn_string, out_conn.ptr, BUFLEN, out_buff, driver_prompt)
connection_string = string(out_conn, out_buff[])
return connection_string
end

"`prepare` prepares an SQL statement to be executed"
function prepare(dsn::DSN, query::AbstractString)
stmt = ODBCAllocHandle(API.SQL_HANDLE_STMT, dsn.dbc_ptr)
@CHECK stmt API.SQL_HANDLE_STMT API.SQLPrepare(stmt, query)
return Statement(dsn, stmt, query, Task(1))
end

function execute!(statement::Statement, values)
stmt = statement.stmt
values2 = Any[cast(x) for x in values]
pointers = Ptr[]
types = map(typeof, values2)
for (i, v) in enumerate(values2)
if ismissing(v)
@CHECK stmt API.SQL_HANDLE_STMT API.SQLBindParameter(stmt, i, API.SQL_PARAM_INPUT,
API.SQL_C_CHAR, API.SQL_CHAR, 0, 0, C_NULL, 0, Ref(API.SQL_NULL_DATA))
else
ctype, sqltype = API.julia2C[types[i]], API.julia2SQL[types[i]]
csize, len, dgts = sqllength(v), clength(v), digits(v)
ptr = getpointer(types[i], values2, i)
# println("ctype: $ctype, sqltype: $sqltype, digits: $dgts, len: $len, csize: $csize")
push!(pointers, ptr)
@CHECK stmt API.SQL_HANDLE_STMT API.SQLBindParameter(stmt, i, API.SQL_PARAM_INPUT,
ctype, sqltype, csize, dgts, ptr, len, Ref(len))
end
end
execute!(statement)
return
end

function execute!(statement::Statement)
stmt = statement.stmt
@CHECK stmt API.SQL_HANDLE_STMT API.SQLExecute(stmt)
return
end

"`execute!` is a minimal method for just executing an SQL `query` string. No results are checked for or returned."
function execute!(dsn::DSN, query::AbstractString, stmt=dsn.stmt_ptr)
ODBCFreeStmt!(stmt)
@CHECK stmt API.SQL_HANDLE_STMT API.SQLExecDirect(stmt, query)
return
end

include("Source.jl")
include("Sink.jl")
include("Query.jl")
# include("sqlreplmode.jl")

const ENV = Ref{Ptr{Cvoid}}()
Expand All @@ -184,12 +243,4 @@ function __init__()
ENV[] = ODBC.ODBCAllocHandle(ODBC.API.SQL_HANDLE_ENV, ODBC.API.SQL_NULL_HANDLE)
end

# used to 'clear' a statement of bound columns, resultsets,
# and other bound parameters in preparation for a subsequent query
function ODBCFreeStmt!(stmt)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_CLOSE)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_UNBIND)
ODBC.API.SQLFreeStmt(stmt, ODBC.API.SQL_RESET_PARAMS)
end

end #ODBC module

0 comments on commit 41b7b6e

Please sign in to comment.