Skip to content

Commit

Permalink
adding unit test for impala
Browse files Browse the repository at this point in the history
  • Loading branch information
agl29 authored and romainr committed May 28, 2021
1 parent 51df6bd commit 95aab93
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 35 deletions.
9 changes: 9 additions & 0 deletions apps/beeswax/data/tables/flights.csv
@@ -0,0 +1,9 @@
"date","hour","minute","dep","arr","dep_delay","arr_delay","carrier","flight","dest","plane","cancelled","time","dist"
2011-12-14 12:00:00,13,4,1304,1704,24,14,"WN",3085,"PHL","N524SW",true,159,1336
2011-12-14 12:00:00,17,52,1752,1943,12,8,"WN",39,"PHX","N503SW",true,155,1020
2011-12-14 12:00:00,7,9,709,853,-1,-12,"WN",424,"PHX","N761RR",true,152,1020
2011-12-14 12:00:00,13,32,1332,1514,17,4,"WN",1098,"PHX","N941WN",true,151,1020
2011-12-14 12:00:00,9,55,955,1141,5,-4,"WN",1403,"PHX","N472WN",true,155,1020
2011-12-14 12:00:00,16,13,1613,1731,8,-4,"WN",33,"SAN","N707SA",true,185,1313
2011-12-14 12:00:00,11,45,1145,1257,5,-13,"WN",1212,"SAN","N279WN",false,183,1313
2011-12-14 12:00:00,20,16,2016,2112,36,32,"WN",207,"SAT","N929WN",false,44,192
56 changes: 21 additions & 35 deletions desktop/libs/indexer/src/indexer/indexers/sql.py
Expand Up @@ -283,7 +283,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
is_task=True
)

def bool_col_update(self, row, columns):
def nomalize_booleans(self, row, columns):
for cnt, col in enumerate(columns):
if col['type'] == 'boolean':
if row[cnt] in ('T', 't', 'true', 'True', 'TRUE', '1'):
Expand Down Expand Up @@ -352,23 +352,21 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
path = urllib_unquote(source['path'])

if path: # data insertion

with open(BASE_DIR + path, 'r') as local_file:
reader = csv.reader(local_file)
csv_rows = []
_csv_rows = []

for count, row in enumerate(reader):
if source['format']['hasHeader'] and count == 0:
continue
if editor_type == 'impala': # for the boolean col updating csv_val to (1,0)
row = self.bool_col_update(row, columns)
csv_rows.append(tuple(row))
if dialect == 'impala': # for the boolean col updating csv_val to (1,0)
row = self.nomalize_booleans(row, columns)
_csv_rows.append(tuple(row))

if csv_rows:
csv_rows = str(csv_rows)
csv_rows = csv_rows[1:-1]
if _csv_rows:
csv_rows = str(_csv_rows)[1:-1]

if editor_type in ('hive', 'mysql'):
if dialect in ('hive', 'mysql'):
sql += '''\nINSERT INTO %(database)s.%(table_name)s VALUES %(csv_rows)s;
'''% {
'database': database,
Expand All @@ -381,36 +379,24 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
else '{0}'.format(col_val) for count, col_val in enumerate(csv_row)])

sql += '''\nUPSERT INTO %(database)s.%(table_name)s VALUES (%(csv_row)s);
''' % {
'database': database,
'table_name': table_name,
'csv_row': _sql
}
elif editor_type == 'impala':
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n\nCREATE TABLE IF NOT EXISTS %(database)s.%(table_name)s
AS SELECT'''% {
''' % {
'database': database,
'table_name': table_name,
'csv_rows': csv_rows
'csv_row': _sql
}
elif dialect == 'impala':
# casting from string to boolean is not allowed in impala so string -> int -> bool
sql_ = ',\n'.join([
' CAST ( `%(name)s` AS %(type)s ) `%(name)s`' % col if col['type'] != 'boolean' \
else ' CAST ( CAST ( `%(name)s` AS TINYINT ) AS boolean ) `%(name)s`' % col for col in columns
])

for count, col in enumerate(columns):
if col['type'] == 'boolean': # casting from string to boolean is not allowed in impala so string -> int -> bool
sql += '''\n CAST ( CAST ( `%(col_name)s` AS TINYINT ) AS boolean ) `%(col_name)s`'''%{
'col_name': col['name']
}
else:
sql += '''\n CAST ( `%(col_name)s` AS %(col_type)s ) `%(col_name)s`'''%{
'col_name': col['name'],
'col_type': col['type']
}
if count != len(columns)-1:
sql += ','

sql += '''\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;
'''% {
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n\nCREATE TABLE IF NOT EXISTS %(database)s.%(table_name)s
AS SELECT\n%(sql_)s\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;'''% {
'database': database,
'table_name': table_name,
'csv_rows': csv_rows,
'sql_': sql_
}

on_success_url = reverse('metastore:describe_table', kwargs={'database': database, 'table': final_table_name}) + \
Expand Down Expand Up @@ -477,4 +463,4 @@ def _create_table_from_local(request, source, destination, start_time=-1):
if request.POST.get('show_command'):
return {'status': 0, 'commands': notebook.get_str()}
else:
return notebook.execute(request, batch=False)
return notebook.execute(request, batch=False)
82 changes: 82 additions & 0 deletions desktop/libs/indexer/src/indexer/indexers/sql_tests.py
Expand Up @@ -920,3 +920,85 @@ def test_create_table_from_local_phoenix():
UPSERT INTO default.test1 VALUES ('CA', 'San Jose', 912332);'''

assert_equal(statement, sql)


def test_create_table_from_local_impala():
with patch('indexer.indexers.sql.get_interpreter') as get_interpreter:
get_interpreter.return_value = {'Name': 'Impala', 'dialect': 'impala'}
source = {
'path': '/apps/beeswax/data/tables/flights.csv',
'sourceType': 'impala',
'format': {'hasHeader': True}
}
destination = {
'name': 'default.test1',
'columns': [
{'name': 'date', 'type': 'timestamp'},
{'name': 'hour', 'type': 'bigint'},
{'name': 'minute', 'type': 'bigint'},
{'name': 'dep', 'type': 'bigint'},
{'name': 'arr', 'type': 'bigint'},
{'name': 'dep_delay', 'type': 'bigint'},
{'name': 'arr_delay', 'type': 'bigint'},
{'name': 'carrier', 'type': 'string'},
{'name': 'flight', 'type': 'bigint'},
{'name': 'dest', 'type': 'string'},
{'name': 'plane', 'type': 'string'},
{'name': 'cancelled', 'type': 'boolean'},
{'name': 'time', 'type': 'bigint'},
{'name': 'dist', 'type': 'bigint'},
],
'sourceType': 'impala'
}
request = MockRequest(fs=MockFs())
sql = SQLIndexer(user=request.user, fs=request.fs).create_table_from_local_file(source, destination).get_str()

statement = '''USE default;
CREATE TABLE IF NOT EXISTS default.test1_tmp (
`date` string,
`hour` string,
`minute` string,
`dep` string,
`arr` string,
`dep_delay` string,
`arr_delay` string,
`carrier` string,
`flight` string,
`dest` string,
`plane` string,
`cancelled` string,
`time` string,
`dist` string);
INSERT INTO default.test1_tmp VALUES \
('2011-12-14 12:00:00', '13', '4', '1304', '1704', '24', '14', 'WN', '3085', 'PHL', 'N524SW', '1', '159', '1336'), \
('2011-12-14 12:00:00', '17', '52', '1752', '1943', '12', '8', 'WN', '39', 'PHX', 'N503SW', '1', '155', '1020'), \
('2011-12-14 12:00:00', '7', '9', '709', '853', '-1', '-12', 'WN', '424', 'PHX', 'N761RR', '1', '152', '1020'), \
('2011-12-14 12:00:00', '13', '32', '1332', '1514', '17', '4', 'WN', '1098', 'PHX', 'N941WN', '1', '151', '1020'), \
('2011-12-14 12:00:00', '9', '55', '955', '1141', '5', '-4', 'WN', '1403', 'PHX', 'N472WN', '1', '155', '1020'), \
('2011-12-14 12:00:00', '16', '13', '1613', '1731', '8', '-4', 'WN', '33', 'SAN', 'N707SA', '1', '185', '1313'), \
('2011-12-14 12:00:00', '11', '45', '1145', '1257', '5', '-13', 'WN', '1212', 'SAN', 'N279WN', '0', '183', '1313'), \
('2011-12-14 12:00:00', '20', '16', '2016', '2112', '36', '32', 'WN', '207', 'SAT', 'N929WN', '0', '44', '192');
CREATE TABLE IF NOT EXISTS default.test1
AS SELECT
CAST ( `date` AS timestamp ) `date`,
CAST ( `hour` AS bigint ) `hour`,
CAST ( `minute` AS bigint ) `minute`,
CAST ( `dep` AS bigint ) `dep`,
CAST ( `arr` AS bigint ) `arr`,
CAST ( `dep_delay` AS bigint ) `dep_delay`,
CAST ( `arr_delay` AS bigint ) `arr_delay`,
CAST ( `carrier` AS string ) `carrier`,
CAST ( `flight` AS bigint ) `flight`,
CAST ( `dest` AS string ) `dest`,
CAST ( `plane` AS string ) `plane`,
CAST ( CAST ( `cancelled` AS TINYINT ) AS boolean ) `cancelled`,
CAST ( `time` AS bigint ) `time`,
CAST ( `dist` AS bigint ) `dist`
FROM default.test1_tmp;
DROP TABLE IF EXISTS default.test1_tmp;'''

assert_equal(statement, sql)

0 comments on commit 95aab93

Please sign in to comment.