Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[importer_direct_upload] supporting phoenix dialect for direct upload
  • Loading branch information
agl29 committed May 26, 2021
1 parent 45221a6 commit d041721
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 2 deletions.
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/js/ko/components/ko.historyPanel.js
Expand Up @@ -189,6 +189,12 @@ class HistoryPanel {
snippetImage: window.STATIC_URLS['impala/art/icon_impala_48.png'],
sqlDialect: true
},
phoenix: {
placeHolder: I18n('Example: SELECT * FROM tablename, or press CTRL + space'),
aceMode: 'ace/mode/phoenix',
snippetImage: window.STATIC_URLS['beeswax/art/icon_beeswax_48.png'],
sqlDialect: true
},
java: {
snippetIcon: 'fa-file-code-o'
},
Expand Down
27 changes: 27 additions & 0 deletions desktop/libs/indexer/src/indexer/indexers/sql.py
Expand Up @@ -313,6 +313,22 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
'columns': ',\n'.join([' `%(name)s` %(type)s' % col for col in columns]),
}

elif dialect == 'phoenix':

for col in columns:
if col['type'] == 'string':
col['type'] = 'CHAR(255)'

sql = '''CREATE TABLE IF NOT EXISTS %(database)s.%(table_name)s (
%(columns)s
CONSTRAINT my_pk PRIMARY KEY (%(primary_keys)s));
''' % {
'database': database,
'table_name': table_name,
'columns': ',\n'.join([' %(name)s %(type)s' % col for col in columns]),
'primary_keys': ', '.join(destination.get('indexerPrimaryKey'))
}

path = urllib_unquote(source['path'])

if path: # data insertion
Expand All @@ -332,6 +348,17 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
'table_name': table_name,
'csv_rows': csv_rows
}
if dialect == 'phoenix':
for csv_row in list_of_tuples:
_sql = ', '.join([ "'{0}'".format(col_val) if columns[count]['type'] in ('CHAR(255)', 'timestamp') \
else '{0}'.format(col_val) for count, col_val in enumerate(csv_row)])

sql += '''\nUPSERT INTO %(database)s.%(table_name)s VALUES (%(csv_row)s);
''' % {
'database': database,
'table_name': table_name,
'csv_row': _sql
}

on_success_url = reverse('metastore:describe_table', kwargs={'database': database, 'table': final_table_name}) + \
'?source_type=' + source_type
Expand Down
11 changes: 11 additions & 0 deletions desktop/libs/indexer/src/indexer/templates/importer.mako
Expand Up @@ -554,6 +554,11 @@ ${ commonheader(_("Importer"), "indexer", user, request, "60px") | n,unicode }
<label for="dialectType" class="control-label "><div>${ _('Dialect') }</div>
<select id="dialectType" data-bind="selectize: $parent.createWizard.source.interpreters, value: $parent.createWizard.source.interpreter, optionsText: 'name', optionsValue: 'type'"></select>
</label>
<div data-bind="visible: dialect() == 'phoenix'">
<label for="PhoenixPks" class="control-label"><div>${ _('Primary key') }</div>
<select id="PhoenixPks" data-bind="selectize: columns, selectedOptions: indexerPrimaryKey, selectedObjects: indexerPrimaryKeyObject, optionsValue: 'name', optionsText: 'name', innerSubscriber: 'name'" size="1"></select>
</label>
</div>
</div>
<label for="collectionName" class="control-label "><div>${ _('Name') }</div></label>
<input type="text" class="input-xxlarge" data-bind="value: name, hiveChooser: name, namespace: namespace, compute: compute, skipColumns: true, skipTables: outputFormat() == 'database', valueUpdate: 'afterkeydown', apiHelperUser: '${ user }', apiHelperType: sourceType, mainScrollable: $(MAIN_SCROLLABLE), attr: { 'placeholder': outputFormat() == 'table' ? '${ _ko('Table name or <database>.<table>') }' : '${ _ko('Database name') }' }" pattern="^([a-zA-Z0-9_]+\.)?[a-zA-Z0-9_]*$" title="${ _('Only alphanumeric and underscore characters') }">
Expand Down Expand Up @@ -1800,6 +1805,11 @@ ${ commonheader(_("Importer"), "indexer", user, request, "60px") | n,unicode }
self.interpreter.subscribe(function(val) {
self.sourceType(val);
wizard.destination.sourceType(val);
for (let i = 0; i < self.interpreters().length; i++) {
if (val == self.interpreters()[i]['type']) {
wizard.destination.dialect(self.interpreters()[i]['dialect']);
}
}
});
// File
Expand Down Expand Up @@ -2204,6 +2214,7 @@ ${ commonheader(_("Importer"), "indexer", user, request, "60px") | n,unicode }
var self = this;
self.apiHelperType = vm.sourceType;
self.sourceType = ko.observable(vm.sourceType);
self.dialect = ko.observable('');
self.name = ko.observable('').extend({ throttle: 500 });
self.nameChanged = function(name) {
Expand Down
8 changes: 6 additions & 2 deletions desktop/libs/notebook/src/notebook/connectors/sql_alchemy.py
Expand Up @@ -239,7 +239,8 @@ def execute(self, notebook, snippet):

engine = self._get_engine()
connection = self._create_connection(engine)
statement = snippet['statement']
stmt_dict = self._get_current_statement(notebook, snippet)
statement = stmt_dict['statement']

if self.interpreter['dialect_properties'].get('trim_statement_semicolon', True):
statement = statement.strip().rstrip(';')
Expand Down Expand Up @@ -270,7 +271,7 @@ def execute(self, notebook, snippet):
}
CONNECTIONS[guid] = cache

return {
response = {
'sync': False,
'has_result_set': result.cursor != None,
'modified_row_count': 0,
Expand All @@ -282,6 +283,9 @@ def execute(self, notebook, snippet):
'type': 'table'
}
}
response.update(stmt_dict)

return response


@query_error_handler
Expand Down

0 comments on commit d041721

Please sign in to comment.