Permalink
Browse files

First commit to run the random query generator on Hive.

With this change, random query generator can run continuously on Hive
and approximately half of its generated queries are able to run.

1. Connect timeout from Impyla to HS2 was too small,
increasing it to match Impala's.
2. Query timeout to wait for Hive queries was too short,
making it configurable so we can play with different values.
3. Hive does not support 'with' clause in subquery,
but interestingly supports it at the top-level.
Added a profile flag "use_nested_with" to disable nested with's.
4. Hive does not support 'having' without 'group by'.
Added a profile flag "use_having_without_groupby" to always
generate a group by with having.
5. Hive does not support "interval" keyword for timestamp.
Added a profile 'restrict' list to restrict certain functions,
and added 'dateAdd' to this list for Hive.
6. Hive 'greatest' and 'least' UDF's do not do implicit type casting
like other databases.  Modified the query-generator to only choose args of
the same type for these, and for HiveSqlWriter to add a cast as there
were still some lingering issues like udf's on int returning bigint.
7. Hive always orders the Nulls first in ORDER BY ASC,
opposite to other databases,
and does not have any 'NULLS FIRST' or 'NULLS LAST' option.
Thus the only workaround is to add a "nulls_order_asc" flag
to the profile, and pass it in to the ref database's SqlWriter
to generate the 'NULLS FIRST' or 'NULLS LAST' statement on that end.
8. Hive strangely does not support multiple sort keys in a window
without frame specification.  The workaround is for HiveSqlWriter
to add 'rows unbounded preceding' to specify the default frame if
there are no existing frames.

Change-Id: I2a5b07e37378f695de1b50af49845283468b4f0f
Reviewed-on: http://gerrit.cloudera.org:8080/619
Reviewed-by: Casey Ching <casey@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information...
szehon authored and Internal Jenkins committed Aug 7, 2015
1 parent b561c83 commit 6dad2e8ba809beeab507fa373c1b7846fc97fe6f
@@ -86,6 +86,10 @@ def add_data_types_options(section):
help='A comma separated list of data types to use.')
+def add_timeout_option(section):
+ section.add_option('--timeout', default=(3 * 60), type=int, help='Query timeout in seconds')
+
+
def add_connection_option_groups(parser):
group = OptionGroup(parser, "Impala Options")
group.add_option('--impalad-host', default='localhost',
@@ -109,7 +109,8 @@ def create_connection(self, db_name=None):
host=self.host_name,
port=self.port,
ldap_user=self.user_name,
- ldap_password=self.password)
+ ldap_password=self.password,
+ timeout=maxint)
return HiveDbConnection(self, connection, user_name=self.user_name,
user_pass=self.password, db_name=db_name, hdfs_host=self.hdfs_host,
hdfs_port=self.hdfs_port)
@@ -54,20 +54,21 @@ class QueryResultComparator(object):
# The DECIMAL values will be rounded before comparison
DECIMAL_PLACES = 2
- def __init__(self, ref_connection, test_connection):
+ def __init__(self, query_profile, ref_connection, test_connection, query_timeout_seconds):
'''test/ref_connection arguments should be an instance of DbConnection'''
ref_cursor = ref_connection.create_cursor()
test_cursor = test_connection.create_cursor()
self.ref_connection = ref_connection
- self.ref_sql_writer = SqlWriter.create(dialect=ref_connection.db_type)
+ self.ref_sql_writer = SqlWriter.create(dialect=ref_connection.db_type,
+ nulls_order_asc=query_profile.nulls_order_asc())
self.test_connection = test_connection
self.test_sql_writer = SqlWriter.create(dialect=test_connection.db_type)
self.query_executor = QueryExecutor(
[ref_cursor, test_cursor],
[self.ref_sql_writer, self.test_sql_writer],
- query_timeout_seconds=(3 * 60))
+ query_timeout_seconds=query_timeout_seconds)
@property
def ref_db_type(self):
@@ -456,7 +457,8 @@ def __init__(self, query_profile, ref_connection, test_connection):
self.common_tables = DbConnection.describe_common_tables(
[ref_connection, test_connection])
- def search(self, number_of_test_queries, stop_on_result_mismatch, stop_on_crash):
+ def search(self, number_of_test_queries, stop_on_result_mismatch, stop_on_crash,
+ query_timeout_seconds):
'''Returns an instance of SearchResults, which is a summary report. This method
oversees the generation, execution, and comparison of queries.
@@ -465,7 +467,7 @@ def search(self, number_of_test_queries, stop_on_result_mismatch, stop_on_crash)
'''
start_time = time()
query_result_comparator = QueryResultComparator(
- self.ref_connection, self.test_connection)
+ self.query_profile, self.ref_connection, self.test_connection, query_timeout_seconds)
query_generator = QueryGenerator(self.query_profile)
query_count = 0
queries_resulted_in_data_count = 0
@@ -621,6 +623,7 @@ def __str__(self):
cli_options.add_logging_options(parser)
cli_options.add_db_name_option(parser)
cli_options.add_connection_option_groups(parser)
+ cli_options.add_timeout_option(parser)
parser.add_option('--test-db-type', default=IMPALA,
choices=(HIVE, IMPALA, MYSQL, ORACLE, POSTGRESQL),
@@ -669,7 +672,8 @@ def __str__(self):
# Create an instance of profile class (e.g. DefaultProfile)
query_profile = profiles[options.profile]()
diff_searcher = QueryResultDiffSearcher(query_profile, ref_connection, test_connection)
+ query_timeout_seconds = options.timeout
search_results = diff_searcher.search(
- options.query_count, options.stop_on_mismatch, options.stop_on_crash)
+ options.query_count, options.stop_on_mismatch, options.stop_on_crash, query_timeout_seconds)
print(search_results)
sys.exit(search_results.mismatch_count)
@@ -20,6 +20,7 @@
from tests.comparison.types import (
Char,
+ Decimal,
Float,
Int,
String,
@@ -37,25 +38,29 @@ class SqlWriter(object):
'''
@staticmethod
- def create(dialect='impala'):
+ def create(dialect='impala', nulls_order_asc='DEFAULT'):
'''Create and return a new SqlWriter appropriate for the given sql dialect. "dialect"
refers to database specific deviations of sql, and the val should be one of
"IMPALA", "MYSQL", or "POSTGRESQL".
'''
dialect = dialect.upper()
if dialect == 'IMPALA':
- return ImpalaSqlWriter()
+ return ImpalaSqlWriter(nulls_order_asc)
if dialect == 'MYSQL':
- return MySQLSqlWriter()
+ return MySQLSqlWriter(nulls_order_asc)
if dialect == 'ORACLE':
- return OracleSqlWriter()
+ return OracleSqlWriter(nulls_order_asc)
if dialect == 'POSTGRESQL':
- return PostgresqlSqlWriter()
+ return PostgresqlSqlWriter(nulls_order_asc)
if dialect == 'HIVE':
- return HiveSqlWriter()
+ return HiveSqlWriter(nulls_order_asc)
raise Exception('Unknown dialect: %s' % dialect)
- def __init__(self):
+ def __init__(self, nulls_order_asc):
+ if nulls_order_asc not in ('BEFORE', 'AFTER', 'DEFAULT'):
+ raise Exception('Unknown nulls order: %s' % nulls_order_asc)
+ self.nulls_order_asc = nulls_order_asc
+
# Functions that don't follow the usual call syntax of foo(bar, baz) can be listed
# here. Parenthesis were added everywhere to avoid problems with operator precedence.
# TODO: Account for operator precedence...
@@ -228,6 +233,9 @@ def _write_as_comma_list(self, items):
def _write_cast_as_char(self, func):
return 'CAST(%s AS %s)' % (self._write(func.args[0]), self._write(String))
+ def _write_cast(self, arg, type):
+ return 'CAST(%s AS %s)' % (self._write(arg), type)
+
def _write_date_add_year(self, func):
return "%s + INTERVAL %s YEAR" \
% (self._write(func.args[0]), self._write(func.args[1]))
@@ -330,6 +338,9 @@ def _write_order_by_clause(self, order_by_clause):
sql += self._write(expr)
if order:
sql += ' ' + order
+ nulls_order = self.get_nulls_order(order)
+ if nulls_order is not None:
+ sql += ' ' + nulls_order
return sql
def _write_limit_clause(self, limit_clause):
@@ -359,6 +370,21 @@ def _write(self, object_):
raise Exception('Unsupported object: %s<%s>' % (type(object_).__name__, object_))
+ def get_nulls_order(self, order):
+ if self.nulls_order_asc is None:
+ return None
+ nulls_order_asc = self.nulls_order_asc
+ if order == 'ASC':
+ if nulls_order_asc == 'BEFORE':
+ return 'NULLS FIRST'
+ if nulls_order_asc == 'AFTER':
+ return 'NULLS LAST'
+ if order == 'DESC':
+ if nulls_order_asc == 'BEFORE':
+ return 'NULLS LAST'
+ if nulls_order_asc == 'AFTER':
+ return 'NULLS FIRST'
+
def _to_py_name(self, name):
return sub('([A-Z])', r'_\1', name).lower().lstrip('_')
@@ -380,6 +406,60 @@ class HiveSqlWriter(SqlWriter):
DIALECT = 'HIVE'
+ # Hive greatest UDF is strict on type equality
+ # Hive Profile already restricts to signatures with the same types,
+ # but sometimes expression with UDF's like 'count'
+ # return an unpredictable type like 'bigint' unlike
+ # the query model, so cast is still necessary.
+ def _write_greatest(self, func):
+ args = func.args
+ if args[0].type in (Int, Decimal, Float):
+ argtype = args[0].type.__name__.lower()
+ sql = '%s(%s)' % \
+ (self._to_sql_name(func.name()),
+ (self._write_cast(args[0], argtype)
+ + ", "
+ + self._write_cast(args[1], argtype)))
+ else:
+ sql = self._write_func(func)
+ return sql
+
+ # Hive least UDF is strict on type equality
+ # Hive Profile already restricts to signatures with the same types,
+ # but sometimes expression with UDF's like 'count'
+ # return an unpredictable type like 'bigint' unlike
+ # the query model, so cast is still necessary.
+ def _write_least(self, func):
+ args = func.args
+ if args[0].type in (Int, Decimal, Float):
+ argtype = args[0].type.__name__.lower()
+ sql = '%s(%s)' % \
+ (self._to_sql_name(func.name()),
+ (self._write_cast(args[0], argtype)
+ + ", "
+ + self._write_cast(args[1], argtype)))
+ else:
+ sql = self._write_func(func)
+ return sql
+
+ # Hive partition by clause throws exception if sorted by more than one key, unless 'rows unbounded preceding' added.
+ def _write_analytic_func(self, func):
+ sql = self._to_sql_name(func.name()) \
+ + '(' + self._write_as_comma_list(func.args) \
+ + ') OVER ('
+ options = []
+ if func.partition_by_clause:
+ options.append(self._write(func.partition_by_clause))
+ if func.order_by_clause:
+ options.append(self._write(func.order_by_clause))
+ if func.window_clause:
+ options.append(self._write(func.window_clause))
+ if func.partition_by_clause and func.order_by_clause:
+ if len(func.order_by_clause.exprs_to_order) > 1:
+ if func.SUPPORTS_WINDOWING and func.window_clause is None:
+ options.append('rows unbounded preceding')
+ return sql + ' '.join(options) + ')'
+
class PostgresqlSqlWriter(SqlWriter):
@@ -434,6 +514,9 @@ def _write_order_by_clause(self, order_by_clause):
sql += self._write(expr)
if order:
sql += ' ' + order
+ nulls_order = self.get_nulls_order(order)
+ if (nulls_order is not None):
+ sql += ' ' + nulls_order
return sql
def _write_data_type(self, data_type):
@@ -507,4 +590,7 @@ def _write_order_by_clause(self, order_by_clause):
sql += 'ISNULL({0}), {0}'.format(self._write(expr))
if order:
sql += ' ' + order
+ nulls_order = self.get_nulls_order(order)
+ if (nulls_order is not None):
+ sql += ' ' + nulls_order
return sql
@@ -18,7 +18,8 @@
from logging import getLogger
from random import shuffle, choice, randint, randrange
-from tests.comparison.common import TableExprList, ValExpr, ValExprList
+from tests.comparison.common import TableExprList, ValExpr, ValExprList, Table, Column
+from tests.comparison.query_profile import DefaultProfile, HiveProfile
from tests.comparison.funcs import (
AGG_FUNCS,
AggFunc,
@@ -185,7 +186,9 @@ def create_query(self,
select_clause.distinct = True
if self.profile.use_having_clause() \
- and (query.group_by_clause or select_clause.agg_items):
+ and (query.group_by_clause
+ or (self.profile.use_having_without_groupby()
+ and select_clause.agg_items)):
basic_select_item_exprs = \
ValExprList(item.val_expr for item in select_clause.basic_items)
query.having_clause = self._create_having_clause(
@@ -479,7 +482,8 @@ def _populate_func_with_vals(self,
# Don't use UNION + LIMIT; https://issues.cloudera.org/browse/IMPALA-1379
allow_union_clause=(not signature_arg.is_subquery),
table_alias_prefix=(table_alias_prefix +
- ('t' if use_correlated_subquery else '')))
+ ('t' if use_correlated_subquery else '')),
+ allow_with_clause=self.profile.use_nested_with())
if use_scalar_subquery and not use_agg_subquery:
# Impala will assume the query will return more than one row unless a LIMIT 1
# is added. An ORDER BY will also be added under the assumption that we want
@@ -1237,3 +1241,31 @@ def _group_agg_funcs_by_expr(self, val_expr):
# else: The remaining case could happen if the original expr was something like
# "SUM(a) + b + 1" where b is a GROUP BY field.
return exprs_to_funcs
+
+if __name__ == '__main__':
+ '''Generate some queries for manual inspection. The query won't run anywhere because the
+ tables used are fake. To make real queries, we'd need to connect to a database and
+ read the table metadata and such.
+ '''
+ tables = list()
+ data_types = list(TYPES)
+ data_types.remove(Float)
+ for table_idx in xrange(5):
+ table = Table('table_%s' % table_idx)
+ tables.append(table)
+ for col_idx in xrange(3):
+ col_type = choice(data_types)
+ col = Column(table, '%s_col_%s' % (col_type.__name__.lower(), col_idx), col_type)
+ table.cols.append(col)
+
+ query_profile = HiveProfile()
+ query_generator = QueryGenerator(query_profile)
+ from model_translator import SqlWriter
+ sql_writer = SqlWriter.create(dialect='HIVE')
+ ref_writer = SqlWriter.create(dialect='POSTGRESQL', nulls_order_asc=query_profile.nulls_order_asc())
+ for _ in range(3000):
+ query = query_generator.create_query(tables)
+ print("Test db")
+ print(sql_writer.write_query(query) + '\n')
+ print("Ref db")
+ print(ref_writer.write_query(query) + '\n')
@@ -322,6 +322,19 @@ def use_union_all(self):
def get_query_execution(self):
return self._choose_from_weights('QUERY_EXECUTION')
+ def use_having_without_groupby(self):
+ return True
+
+ def use_nested_with(self):
+ return True
+
+ # Workaround for Hive null ordering differences, and lack of 'NULL FIRST', 'NULL LAST'
+ # specifications. The ref db will order nulls as specified for ASC sorting to make it
+ # identifical to Hive. Valid return values are: 'BEFORE', 'AFTER', or 'DEFAULT',
+ # the latter means no specification needed.
+ def nulls_order_asc(self):
+ return 'DEFAULT'
+
def choose_val_expr(self, val_exprs, types=TYPES):
if not val_exprs:
raise Exception('At least on value is required')
@@ -447,5 +460,34 @@ def allow_func_signature(self, signature):
return True
+class HiveProfile(DefaultProfile):
+ def __init__(self):
+ super(HiveProfile, self).__init__()
+
+ def use_having_without_groupby(self):
+ return False
+
+ def use_nested_with(self):
+ return False
+
+ def nulls_order_asc(self):
+ return 'BEFORE'
+
+ def allow_func_signature(self, signature):
+ if signature.func._NAME.startswith('DateAdd'):
+ return False
+ if signature.func._NAME in ('Greatest', 'Least'):
+ type = signature.return_type
+ argtypes = [arg.type for arg in signature.args]
+ for argtype in argtypes:
+ if type is None:
+ type = argtype
+ continue
+ else:
+ if type != argtype:
+ return False
+ return DefaultProfile.allow_func_signature(self, signature)
+
+
PROFILES = [var for var in locals().values()
if isinstance(var, type) and var.__name__.endswith('Profile')]

0 comments on commit 6dad2e8

Please sign in to comment.