Skip to content

Commit 6afea60

Browse files
lskuffHenry Robinson
authored andcommitted
Update test logging to print executable SQL statements and log all actions executed
This is the first step in cleaning up the test logging. It provides a common connection interface that provides tracing around all operations. When a test fails the output will be executable SQL. It also logs actions such as when a connection is opened, close, or when an operation is cancelled. Currently only beeswax connections are supported, but I have a seperate patch that adds support for executing using HS2 as well as Beeswax. Example of new logging: -- connecting to: localhost:21000 -- executing against localhost:21000 use functional; SET disable_codegen=False; SET abort_on_error=1; SET batch_size=0; SET num_nodes=0; -- executing against localhost:21000 select a.timestamp_col from alltypessmall a inner join alltypessmall b on (a.timestamp_col = b.timestamp_col) where a.year=2009 and a.month=1 and b.year=2009 and b.month=1; -- closing connection to: localhost:21000 Change-Id: Iedc7d4d3a84bfeff6cc1daae6ed1ca97613d7700 Reviewed-on: http://gerrit.ent.cloudera.com:8080/1133 Tested-by: jenkins Reviewed-by: Lenni Kuff <lskuff@cloudera.com>
1 parent 6483f53 commit 6afea60

File tree

12 files changed

+231
-54
lines changed

12 files changed

+231
-54
lines changed

testdata/workloads/functional-query/queries/QueryTest/hdfs-scan-node.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,8 +701,8 @@ bigint
701701
---- RESULTS
702702
10000
703703
====
704-
# Select from old (pre-hive 9) rc file table
705704
---- QUERY
705+
# Select from old (pre-hive 9) rc file table
706706
SELECT * FROM functional_rc.old_rcfile_table
707707
---- TYPES
708708
int, string

tests/beeswax/impala_beeswax.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def __options_to_string_list(self):
9595
return ["%s=%s" % (k,v) for (k,v) in self.__query_options.iteritems()]
9696

9797
def get_query_options(self):
98-
return '\n'.join(["\t%s: %s" % (k,v) for (k,v) in self.__query_options.iteritems()])
98+
return self.__query_options
9999

100100
def set_query_option(self, name, value):
101101
self.__query_options[name.upper()] = value
@@ -192,9 +192,15 @@ def wait_for_completion(self, query_handle):
192192
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
193193
time.sleep(0.05)
194194

195+
def get_default_configuration(self):
196+
return self.__do_rpc(lambda: self.imp_service.get_default_configuration(False))
197+
195198
def get_state(self, query_handle):
196199
return self.__do_rpc(lambda: self.imp_service.get_state(query_handle))
197200

201+
def get_log(self, query_handle):
202+
return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
203+
198204
def refresh(self):
199205
"""Invalidate the Impalad catalog"""
200206
return self.execute("invalidate metadata")

tests/common/impala_connection.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#!/usr/bin/env python
2+
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
# Common for connections to Impala. Currently supports Beeswax connections and
17+
# in the future will support HS2 connections. Provides tracing around all
18+
# operations.
19+
20+
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient, QueryResult
21+
from thrift.transport.TSocket import TSocket
22+
from thrift.protocol import TBinaryProtocol
23+
from thrift.transport.TTransport import TBufferedTransport, TTransportException
24+
from getpass import getuser
25+
26+
import abc
27+
import logging
28+
import os
29+
30+
LOG = logging.getLogger('impala_connection')
31+
console_handler = logging.StreamHandler()
32+
console_handler.setLevel(logging.INFO)
33+
# All logging needs to be either executable SQL or a SQL comment (prefix with --).
34+
console_handler.setFormatter(logging.Formatter('%(message)s'))
35+
LOG.addHandler(console_handler)
36+
LOG.propagate = False
37+
38+
# Common wrapper around the internal types of HS2/Beeswax operation/query handles.
39+
class OperationHandle(object):
40+
def __init__(self, handle):
41+
self.__handle = handle
42+
43+
def get_handle(self): return self.__handle
44+
45+
46+
# Represents an Impala connection.
47+
class ImpalaConnection(object):
48+
__metaclass__ = abc.ABCMeta
49+
50+
@abc.abstractmethod
51+
def set_configuration_option(self, name, value):
52+
"""Sets a configuraiton option name to the given value"""
53+
pass
54+
55+
@abc.abstractmethod
56+
def get_configuration(self):
57+
"""Returns the configuration (a dictionary of key-value pairs) for this connection"""
58+
pass
59+
60+
@abc.abstractmethod
61+
def set_configuration(self, configuration_option_dict):
62+
"""Replaces existing configuration with the given dictionary"""
63+
pass
64+
65+
@abc.abstractmethod
66+
def clear_configuration(self):
67+
"""Clears all existing configuration."""
68+
pass
69+
70+
@abc.abstractmethod
71+
def connect(self):
72+
"""Opens the connection"""
73+
pass
74+
75+
@abc.abstractmethod
76+
def close(self):
77+
"""Closes the connection. Can be called multiple times"""
78+
pass
79+
80+
@abc.abstractmethod
81+
def get_state(self, operation_handle):
82+
"""Returns the state of a query"""
83+
pass
84+
85+
@abc.abstractmethod
86+
def get_log(self, operation_handle):
87+
"""Returns the log of an operation"""
88+
pass
89+
90+
@abc.abstractmethod
91+
def cancel(self, operation_handle):
92+
"""Cancels an in-flight operation"""
93+
pass
94+
95+
def execute(self, sql_stmt):
96+
"""Executes a query and fetches the results"""
97+
pass
98+
99+
@abc.abstractmethod
100+
def execute_async(self, sql_stmt):
101+
"""Issues a query and returns the handle to the caller for processing"""
102+
pass
103+
104+
@abc.abstractmethod
105+
def fetch(self, sql_stmt, operation_handle, batch_size=1024):
106+
"""Fetches all query results given a handle and sql statement.
107+
TODO: Support fetching single batch"""
108+
pass
109+
110+
111+
# Represents a connection to Impala using the Beeswax API.
112+
class BeeswaxConnection(ImpalaConnection):
113+
def __init__(self, host_port, use_kerberos=False):
114+
self.__beeswax_client = ImpalaBeeswaxClient(host_port, use_kerberos)
115+
self.__host_port = host_port
116+
self.QUERY_STATES = self.__beeswax_client.query_states
117+
118+
def set_configuration_option(self, name, value):
119+
# Only set the option if it's not already set to the same value.
120+
if self.__beeswax_client.get_query_option(name) != value:
121+
LOG.info('SET %s=%s;' % (name, value))
122+
self.__beeswax_client.set_query_option(name, value)
123+
124+
def get_configuration(self):
125+
return self.__beeswax_client.get_query_options
126+
127+
def set_configuration(self, config_option_dict):
128+
assert config_option_dict is not None, "config_option_dict cannot be None"
129+
self.clear_configuration()
130+
for name, value in config_option_dict.iteritems():
131+
self.set_configuration_option(name, value)
132+
133+
def clear_configuration(self):
134+
self.__beeswax_client.clear_query_options()
135+
136+
def connect(self):
137+
LOG.info("-- connecting to: %s" % self.__host_port)
138+
self.__beeswax_client.connect()
139+
140+
def close(self):
141+
LOG.info("-- closing connection to: %s" % self.__host_port)
142+
self.__beeswax_client.close_connection()
143+
144+
def execute(self, sql_stmt):
145+
LOG.info("-- executing against %s\n%s;\n" % (self.__host_port, sql_stmt))
146+
return self.__beeswax_client.execute(sql_stmt)
147+
148+
def execute_async(self, sql_stmt):
149+
LOG.info("-- executing async: %s\n%s;\n" % (self.__host_port, sql_stmt))
150+
return OperationHandle(self.__beeswax_client.execute_query_async(sql_stmt))
151+
152+
def cancel(self, operation_handle):
153+
LOG.info("-- canceling operation: %s" % operation_handle)
154+
return self.__beeswax_client.cancel_query(operation_handle.get_handle())
155+
156+
def get_state(self, operation_handle):
157+
LOG.info("-- getting state for operation: %s" % operation_handle)
158+
return self.__beeswax_client.get_state(operation_handle.get_handle())
159+
160+
def get_log(self, operation_handle):
161+
LOG.info("-- getting log for operation: %s" % operation_handle)
162+
return self.__beeswax_client.get_log(operation_handle.get_handle())
163+
164+
def refresh(self):
165+
"""Invalidate the Impalad catalog"""
166+
return self.execute("invalidate metadata")
167+
168+
def refresh_table(self, db_name, table_name):
169+
"""Refresh a specific table from the catalog"""
170+
return self.execute("refresh %s.%s" % (db_name, table_name))
171+
172+
def fetch(self, sql_stmt, operation_handle):
173+
LOG.info("-- fetching results from: %s" % operation_handle)
174+
return self.__beeswax_client.fetch_results(sql_stmt, operation_handle.get_handle())
175+
176+
def create_connection(host_port, use_kerberos=False):
177+
# TODO: Support HS2 connections.
178+
return BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos)

tests/common/impala_service.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
from collections import defaultdict
2727
from HTMLParser import HTMLParser
28-
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
28+
from tests.common.impala_connection import ImpalaConnection, create_connection
2929
from time import sleep, time
3030

3131
logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
@@ -159,8 +159,7 @@ def wait_for_query_status(self, client, query_id, expected_content,
159159

160160
def create_beeswax_client(self, use_kerberos=False):
161161
"""Creates a new beeswax client connection to the impalad"""
162-
client =\
163-
ImpalaBeeswaxClient('%s:%d' % (self.hostname, self.beeswax_port), use_kerberos)
162+
client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), use_kerberos)
164163
client.connect()
165164
return client
166165

tests/common/impala_test_suite.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import pytest
2222
from functools import wraps
2323
from random import choice
24-
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
2524
from tests.common.impala_service import ImpaladService
25+
from tests.common.impala_connection import ImpalaConnection, create_connection
2626
from tests.common.test_dimensions import *
2727
from tests.common.test_result_verifier import *
2828
from tests.common.test_vector import *
@@ -38,7 +38,7 @@
3838
from thrift.transport import TTransport, TSocket
3939
from thrift.protocol import TBinaryProtocol
4040

41-
logging.basicConfig(level=logging.INFO, format='%(threadName)s: %(message)s')
41+
logging.basicConfig(level=logging.INFO, format='-- %(message)s')
4242
LOG = logging.getLogger('impala_test_suite')
4343

4444
IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',')
@@ -80,7 +80,7 @@ def setup_class(cls):
8080
cls.hive_client = ThriftHiveMetastore.Client(protocol)
8181
cls.hive_transport.open()
8282

83-
# The ImpalaBeeswaxClient is used to execute queries in the test suite
83+
# Create a connection to Impala.
8484
cls.client = cls.create_impala_client(IMPALAD)
8585

8686
cls.impalad_test_service = ImpaladService(IMPALAD.split(':')[0])
@@ -98,19 +98,19 @@ def teardown_class(cls):
9898
cls.hive_transport.close()
9999

100100
if cls.client:
101-
cls.client.close_connection()
101+
cls.client.close()
102102

103103
@classmethod
104104
def create_impala_client(cls, host_port=IMPALAD):
105-
client = ImpalaBeeswaxClient(host_port,
105+
client = create_connection(host_port=host_port,
106106
use_kerberos=pytest.config.option.use_kerberos)
107107
client.connect()
108108
return client
109109

110110
def cleanup_db(self, db_name):
111111
# To drop a db, we need to first drop all the tables in that db
112112
self.client.execute("use default")
113-
self.client.set_query_options({'sync_ddl': 1})
113+
self.client.set_configuration({'sync_ddl': 1})
114114
if db_name in self.client.execute("show databases", ).data:
115115
for tbl_name in self.client.execute("show tables in " + db_name).data:
116116
full_tbl_name = '%s.%s' % (db_name, tbl_name)
@@ -148,6 +148,7 @@ def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=Fa
148148
# user specified database for all targeted impalad.
149149
for impalad_client in target_impalad_clients:
150150
ImpalaTestSuite.change_database(impalad_client, table_format_info, use_db)
151+
impalad_client.set_configuration(exec_options)
151152

152153
sections = self.load_query_test_file(self.get_workload(), test_file_name)
153154
for test_section in sections:
@@ -172,7 +173,7 @@ def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=Fa
172173
target_impalad_client = choice(target_impalad_clients)
173174
for query in query.split(';'):
174175
result =\
175-
self.execute_query_expect_success(target_impalad_client, query, exec_options)
176+
self.execute_query_expect_success(target_impalad_client, query)
176177
assert result is not None
177178

178179
verify_raw_results(test_section, result,
@@ -219,7 +220,7 @@ def change_database(cls, impala_client, table_format=None, db_name=None):
219220
query = 'use %s' % db_name
220221
# Clear the exec_options before executing a USE statement.
221222
# The USE statement should not fail for negative exec_option tests.
222-
impala_client.clear_query_options()
223+
impala_client.clear_configuration()
223224
impala_client.execute(query)
224225

225226
def execute_wrapper(function):
@@ -247,28 +248,28 @@ def wrapper(*args, **kwargs):
247248
return wrapper
248249

249250
@execute_wrapper
250-
def execute_query_expect_success(self, impalad_client, query, query_exec_options=None):
251+
def execute_query_expect_success(self, impalad_client, query, query_options=None):
251252
"""Executes a query and asserts if the query fails"""
252-
result = self.__execute_query(impalad_client, query, query_exec_options)
253+
result = self.__execute_query(impalad_client, query, query_options)
253254
assert result.success
254255
return result
255256

256257
@execute_wrapper
257-
def execute_query(self, query, query_exec_options=None):
258-
return self.__execute_query(self.client, query, query_exec_options)
258+
def execute_query(self, query, query_options=None):
259+
return self.__execute_query(self.client, query, query_options)
259260

260261
def execute_query_using_client(self, client, query, vector):
261262
self.change_database(client, vector.get_value('table_format'))
262263
return client.execute(query)
263264

264265
@execute_wrapper
265-
def execute_query_async(self, query, query_exec_options=None):
266-
self.__set_exec_options(self.client, query_exec_options)
267-
return self.client.execute_query_async(query)
266+
def execute_query_async(self, query, query_options=None):
267+
self.client.set_configuration(query_options)
268+
return self.client.execute_async(query)
268269

269270
@execute_wrapper
270-
def execute_scalar(self, query, query_exec_options=None):
271-
result = self.__execute_query(self.client, query, query_exec_options)
271+
def execute_scalar(self, query, query_options=None):
272+
result = self.__execute_query(self.client, query, query_options)
272273
assert len(result.data) <= 1, 'Multiple values returned from scalar'
273274
return result.data[0] if len(result.data) == 1 else None
274275

@@ -306,25 +307,18 @@ def __drop_partitions(self, db_name, table_name):
306307
for partition in self.hive_client.get_partition_names(db_name, table_name, 0):
307308
self.hive_client.drop_partition_by_name(db_name, table_name, partition, True)
308309

309-
def __execute_query(self, impalad_client, query, query_exec_options=None):
310+
def __execute_query(self, impalad_client, query, query_options=None):
310311
"""Executes the given query against the specified Impalad"""
311-
LOG.info('Executing Query(%s): \n%s\n' % (impalad_client.impalad, query))
312-
self.__set_exec_options(impalad_client, query_exec_options)
312+
if query_options is not None: impalad_client.set_configuration(query_options)
313313
return impalad_client.execute(query)
314314

315-
def __execute_query_new_client(self, query, query_exec_options=None,
315+
def __execute_query_new_client(self, query, query_options=None,
316316
use_kerberos=False):
317317
"""Executes the given query against the specified Impalad"""
318318
new_client = self.create_impala_client()
319-
self.__set_exec_options(new_client, query_exec_options)
319+
new_client.set_configuration(query_options)
320320
return new_client.execute(query)
321321

322-
def __set_exec_options(self, impalad_client, query_exec_options):
323-
# Set the specified query exec options, if specified
324-
impalad_client.clear_query_options()
325-
if query_exec_options is not None:
326-
impalad_client.set_query_options(query_exec_options)
327-
328322
def __reset_table(self, db_name, table_name):
329323
"""Resets a table (drops and recreates the table)"""
330324
table = self.hive_client.get_table(db_name, table_name)

tests/common/test_result_verifier.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ def __eq__(self, other):
153153
return compare_float(float(self.value), float(other.value), 10e-5)
154154
elif self.column_type == 'double':
155155
return compare_float(float(self.value), float(other.value), 10e-10)
156+
elif self.column_type == 'boolean':
157+
return str(self.value).lower() == str(other.value).lower()
156158
else:
157159
return self.value == other.value
158160

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def pytest_addoption(parser):
5353
"for the hive metastore client when using kerberos.")
5454

5555
parser.addoption("--use_kerberos", action="store_true", default=False,
56-
help="Use kerberos transport for running tests")
56+
help="use kerberos transport for running tests")
5757

5858
parser.addoption("--sanity", action="store_true", default=False,
5959
help="Runs a single test vector from each test to provide a quick "\

0 commit comments

Comments
 (0)