Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HiveServer2 support. Thrift SASL Client transport #6

Merged
merged 3 commits into from
Jun 16, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
= RBHive -- Ruby thrift lib for executing Hive queries

A simple library to execute Hive queries against the Hive thrift server.
Also supports PLAIN SASL connection to thrift server (HiveServer2)

== Example to fetch some results

Expand All @@ -12,6 +13,16 @@ A simple library to execute Hive queries against the Hive thrift server.
end
➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}]

HiveServer2:

RBHive.tcli_connect('hive.server2.address', 'hive.server2.port', 'hive.server2.connection_options_hash') do |connection|
connection.fetch 'SELECT city, country FROM cities'
end
➔ [{:city => "London", :country => "UK"}, {:city => "Mumbai", :country => "India"}, {:city => "New York", :country => "USA"}]

where connection_options_hash = { username: 'user', password: 'pass'} or an empty(nil) is accepted
It is in progress to implement LDAP/KERBEROS authentication as well. Only NONE supported currently.

== Example to execute a query

require 'rubygems'
Expand Down
5 changes: 4 additions & 1 deletion lib/rbhive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
require File.join(File.dirname(__FILE__), 'rbhive', 'table_schema')
require File.join(File.dirname(__FILE__), 'rbhive', 'result_set')
require File.join(File.dirname(__FILE__), 'rbhive', 'explain_result')
require File.join(File.dirname(__FILE__), 'rbhive', 'schema_definition')
require File.join(File.dirname(__FILE__), 'rbhive', 'schema_definition')
require File.join(File.dirname(__FILE__), *%w[rbhive t_c_l_i_result_set])
require File.join(File.dirname(__FILE__), *%w[rbhive t_c_l_i_schema_definition])
require File.join(File.dirname(__FILE__), *%w[rbhive t_c_l_i_connection])
165 changes: 165 additions & 0 deletions lib/rbhive/t_c_l_i_connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# suppress warnings
old_verbose, $VERBOSE = $VERBOSE, nil

raise 'Thrift is not loaded' unless defined?(Thrift)
raise 'RBHive is not loaded' unless defined?(RBHive)

# require thrift autogenerated files
require File.join(File.dirname(__FILE__), *%w[.. thrift t_c_l_i_service_constants])
require File.join(File.dirname(__FILE__), *%w[.. thrift t_c_l_i_service])
require File.join(File.dirname(__FILE__), *%w[.. thrift sasl_client_transport])

# restore warnings
$VERBOSE = old_verbose


module RBHive
def tcli_connect(server, port=10_000, sasl_params={})
connection = RBHive::TCLIConnection.new(server, port, sasl_params)
ret = nil
begin
connection.open
connection.open_session
ret = yield(connection)
ensure
connection.close_session if connection.session
connection.close
ret
end
end
module_function :tcli_connect

class StdOutLogger
%w(fatal error warn info debug).each do |level|
define_method level.to_sym do |message|
STDOUT.puts(message)
end
end
end

class TCLIConnection
attr_reader :client

def initialize(server, port=10_000, sasl_params=nil, logger=StdOutLogger.new)
@socket = Thrift::Socket.new(server, port)
@socket.timeout = 1800
@logger = logger
if sasl_params.present?
@logger.info("Initializing transport with SASL support")
@transport = Thrift::SaslClientTransport.new(@socket, sasl_params)
@sasl_params = case sasl_params
when Hash then sasl_params.symbolize_keys
when Hashie::Mash then sasl_params.to_hash(symbolize_keys: true)
else nil
end
else
@transport = Thrift::BufferedTransport.new(@socket)
end
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = TCLIService::Client.new(@protocol)
@session = nil
@logger.info("Connecting to HiveServer2 #{server} on port #{port}")
@mutex = Mutex.new
end

def open
@transport.open
end

def close
@transport.close
end

def open_session
@session = @client.OpenSession(prepare_open_session)
end

def close_session
@client.CloseSession prepare_close_session
@session = nil
end

def session
@session && @session.sessionHandle
end

def client
@client
end

def execute(query)
execute_safe(query)
end

def priority=(priority)
set("mapred.job.priority", priority)
end

def queue=(queue)
set("mapred.job.queue.name", queue)
end

def set(name,value)
@logger.info("Setting #{name}=#{value}")
self.execute("SET #{name}=#{value}")
end

def fetch(query)
safe do
op_handle = execute_unsafe(query).operationHandle
fetch_req = prepare_fetch_results(op_handle)
fetch_results = client.FetchResults(fetch_req)
raise fetch_results.status.try(:errorMessage, 'Execution failed!').to_s if fetch_results.status.statusCode != 0
rows = fetch_results.results.rows
the_schema = TCLISchemaDefinition.new(get_schema_for( op_handle ), rows.first)
TCLIResultSet.new(rows, the_schema)
end
end

def method_missing(meth, *args)
client.send(meth, *args)
end

private

def execute_safe(query)
safe { execute_unsafe(query) }
end

def execute_unsafe(query)
@logger.info("Executing Hive Query: #{query}")
req = prepare_execute_statement(query)
client.ExecuteStatement(req)
end

def safe
ret = nil
@mutex.synchronize { ret = yield }
ret
end

def prepare_open_session
TOpenSessionReq.new( @sasl_params.presence )
end

def prepare_close_session
TCloseSessionReq.new( sessionHandle: self.session )
end

def prepare_execute_statement(query)
TExecuteStatementReq.new( sessionHandle: self.session, statement: query.to_s )
end

def prepare_fetch_results(handle, orientation=:first, rows=100)
orientation = orientation.to_s.upcase
orientation = 'FIRST' unless TFetchOrientation::VALID_VALUES.include?( "FETCH_#{orientation}" )
TFetchResultsReq.new( operationHandle: handle, orientation: eval("TFetchOrientation::FETCH_#{orientation}"), maxRows: rows )
end

def get_schema_for(handle)
req = TGetResultSetMetadataReq.new( operationHandle: handle )
metadata = client.GetResultSetMetadata( req )
metadata.schema
end
end
end
3 changes: 3 additions & 0 deletions lib/rbhive/t_c_l_i_result_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module RBHive
class TCLIResultSet < ResultSet; end
end
87 changes: 87 additions & 0 deletions lib/rbhive/t_c_l_i_schema_definition.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
require 'json'

module RBHive
class TCLISchemaDefinition
attr_reader :schema

TYPES = {
:boolean => :to_s,
:string => :to_s,
:bigint => :to_i,
:float => :to_f,
:double => :to_f,
:int => :to_i,
:bigint => :to_i,
:smallint => :to_i,
:tinyint => :to_i,
}

def initialize(schema, example_row)
@schema = schema
@example_row = example_row ? example_row.colVals : []
end

def column_names
@column_names ||= begin
schema_names = @schema.columns.map {|c| c.columnName }

# In rare cases Hive can return two identical column names
# consider SELECT a.foo, b.foo...
# in this case you get two columns called foo with no disambiguation.
# as a (far from ideal) solution we detect this edge case and rename them
# a.foo => foo1, b.foo => foo2
# otherwise we will trample one of the columns during Hash mapping.
s = Hash.new(0)
schema_names.map! { |c| s[c] += 1; s[c] > 1 ? "#{c}---|---#{s[c]}" : c }
schema_names.map! { |c| s[c] > 1 ? "#{c}---|---1" : c }
schema_names.map! { |c| c.gsub('---|---', '_').to_sym }

# Lets fix the fact that Hive doesn't return schema data for partitions on SELECT * queries
# For now we will call them :_p1, :_p2, etc. to avoid collisions.
offset = 0
while schema_names.length < @example_row.length
schema_names.push(:"_p#{offset+=1}")
end
schema_names
end
end

def column_type_map
@column_type_map ||= column_names.inject({}) do |hsh, c|
definition = @schema.columns.find {|s| s.columnName.to_sym == c }
# If the column isn't in the schema (eg partitions in SELECT * queries) assume they are strings
type = TYPE_NAMES[definition.typeDesc.types.first.primitiveEntry.type].downcase rescue nil
hsh[c] = definition && type ? type.to_sym : :string
hsh
end
end

def coerce_row(row)
column_names.zip(row.colVals.map(&:get_value).map(&:value)).inject({}) do |hsh, (column_name, value)|
hsh[column_name] = coerce_column(column_name, value)
hsh
end
end

def coerce_column(column_name, value)
type = column_type_map[column_name]
return 1.0/0.0 if(type != :string && value == "Infinity")
return 0.0/0.0 if(type != :string && value == "NaN")
return nil if value.nil? || value == 'NULL' || value == 'null'
return coerce_complex_value(value) if type.to_s =~ /^array/
conversion_method = TYPES[type]
conversion_method ? value.send(conversion_method) : value
end

def coerce_row_to_array(row)
column_names.map { |n| row[n] }
end

def coerce_complex_value(value)
return nil if value.nil?
return nil if value.length == 0
return nil if value == 'null'
JSON.parse(value)
end
end
end
94 changes: 94 additions & 0 deletions lib/thrift/sasl_client_transport.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
module Thrift
class SaslClientTransport < BufferedTransport
attr_reader :challenge

STATUS_BYTES = 1
PAYLOAD_LENGTH_BYTES = 4
AUTH_MECHANISM = 'PLAIN'
NEGOTIATION_STATUS = {
START: 0x01,
OK: 0x02,
BAD: 0x03,
ERROR: 0x04,
COMPLETE: 0x05
}

def initialize(transport, sasl_params={})
super(transport)
@challenge = nil
@sasl_username = sasl_params.fetch(:username, 'anonymous')
@sasl_password = sasl_params.fetch(:password, 'anonymous')
end

def read(sz)
len, = @transport.read(PAYLOAD_LENGTH_BYTES).unpack('l>') if @rbuf.nil?
sz = len if len && sz > len
@index += sz
ret = @rbuf.slice(@index - sz, sz) || Bytes.empty_byte_buffer
if ret.length == 0
@rbuf = @transport.read(len) rescue Bytes.empty_byte_buffer
@index = sz
ret = @rbuf.slice(0, sz) || Bytes.empty_byte_buffer
end
ret
end

def read_byte
reset_buffer! if @index >= @rbuf.size
@index += 1
Bytes.get_string_byte(@rbuf, @index - 1)
end

def read_into_buffer(buffer, size)
i = 0
while i < size
reset_buffer! if @index >= @rbuf.size
byte = Bytes.get_string_byte(@rbuf, @index)
Bytes.set_string_byte(buffer, i, byte)
@index += 1
i += 1
end
i
end

def write(buf)
initiate_hand_shake if @challenge.nil?
header = [buf.length].pack('l>')
@wbuf << (header + Bytes.force_binary_encoding(buf))
end

protected

def initiate_hand_shake
header = [NEGOTIATION_STATUS[:START], AUTH_MECHANISM.length].pack('cl>')
@transport.write header + AUTH_MECHANISM
message = "[#{AUTH_MECHANISM}]\u0000#{@sasl_username}\u0000#{@sasl_password}"
header = [NEGOTIATION_STATUS[:OK], message.length].pack('cl>')
@transport.write header + message
status, len = @transport.read(STATUS_BYTES + PAYLOAD_LENGTH_BYTES).unpack('cl>')
case status
when NEGOTIATION_STATUS[:BAD], NEGOTIATION_STATUS[:ERROR]
raise @transport.to_io.read(len)
when NEGOTIATION_STATUS[:COMPLETE]
@challenge = @transport.to_io.read len
when NEGOTIATION_STATUS[:OK]
raise "Failed to complete challenge exchange: only NONE supported currently"
end
end

private

def reset_buffer!
len, = @transport.read(PAYLOAD_LENGTH_BYTES).unpack('l>')
@rbuf = @transport.read(len)
@index = 0
end
end

class SaslClientTransportFactory < BaseTransportFactory
def get_transport(transport)
return SaslClientTransport.new(transport)
end
end

end
Loading