Skip to content

Commit

Permalink
feat: Add parquet upload (#14449)
Browse files Browse the repository at this point in the history
* allow csv upload to accept parquet file

* fix mypy

* fix if statement

* add test for specificying columns in CSV upload

* clean up test

* change order in test

* fix failures

* upload parquet to seperate table in test

* fix error message

* fix mypy again

* rename other extensions to columnar

* add new form for columnar upload

* add support for zip files

* undo csv form changes except usecols

* add more tests for zip

* isort & black

* pylint

* fix trailing space

* address more review comments

* pylint

* black

* resolve remaining issues
  • Loading branch information
exemplary-citizen committed Aug 31, 2021
1 parent ad8336a commit d25b096
Show file tree
Hide file tree
Showing 10 changed files with 493 additions and 10 deletions.
3 changes: 2 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ def _try_json_readsha(filepath: str, length: int) -> Optional[str]:
# Allowed format types for upload on Database view
EXCEL_EXTENSIONS = {"xlsx", "xls"}
CSV_EXTENSIONS = {"csv", "tsv", "txt"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS}
COLUMNAR_EXTENSIONS = {"parquet", "zip"}
ALLOWED_EXTENSIONS = {*EXCEL_EXTENSIONS, *CSV_EXTENSIONS, *COLUMNAR_EXTENSIONS}

# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv
# method.
Expand Down
17 changes: 16 additions & 1 deletion superset/initialization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def init_views(self) -> None:
DashboardModelViewAsync,
)
from superset.views.database.views import (
ColumnarToDatabaseView,
CsvToDatabaseView,
DatabaseView,
ExcelToDatabaseView,
Expand Down Expand Up @@ -281,6 +282,7 @@ def init_views(self) -> None:
appbuilder.add_view_no_menu(CssTemplateAsyncModelView)
appbuilder.add_view_no_menu(CsvToDatabaseView)
appbuilder.add_view_no_menu(ExcelToDatabaseView)
appbuilder.add_view_no_menu(ColumnarToDatabaseView)
appbuilder.add_view_no_menu(Dashboard)
appbuilder.add_view_no_menu(DashboardModelViewAsync)
appbuilder.add_view_no_menu(Datasource)
Expand Down Expand Up @@ -371,7 +373,20 @@ def init_views(self) -> None:
)
),
)

appbuilder.add_link(
"Upload a Columnar file",
label=__("Upload a Columnar file"),
href="/columnartodatabaseview/form",
icon="fa-upload",
category="Data",
category_label=__("Data"),
category_icon="fa-wrench",
cond=lambda: bool(
self.config["COLUMNAR_EXTENSIONS"].intersection(
self.config["ALLOWED_EXTENSIONS"]
)
),
)
try:
import xlrd # pylint: disable=unused-import

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{#
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
#}
{% extends 'appbuilder/general/model/edit.html' %}

{% block tail_js %}
{{ super() }}
<script>
var db = $("#con");
var schema = $("#schema");

// this element is a text input
// copy it here so it can be reused later
var any_schema_is_allowed = schema.clone();

update_schemas_allowed_for_columnar_upload(db.val());
db.change(function(){
update_schemas_allowed_for_columnar_upload(db.val());
});

function update_schemas_allowed_for_columnar_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"
}).done(function(data) {
change_schema_field_in_formview(data)
}).fail(function(error) {
var errorMsg = error.responseJSON.error;
alert("ERROR: " + errorMsg);
});
}

function change_schema_field_in_formview(schemas_allowed){
if (schemas_allowed && schemas_allowed.length > 0) {
var dropdown_schema_lists = '<select id="schema" name="schema" required>';
schemas_allowed.forEach(function(schema_allowed) {
dropdown_schema_lists += ('<option value="' + schema_allowed + '">' + schema_allowed + '</option>');
});
dropdown_schema_lists += '</select>';
$("#schema").replaceWith(dropdown_schema_lists);
} else {
$("#schema").replaceWith(any_schema_is_allowed)
}
}
</script>
{% endblock %}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
function update_schemas_allowed_for_csv_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_csv_upload",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
function update_schemas_allowed_for_excel_upload(db_id) {
$.ajax({
method: "GET",
url: "/superset/schemas_access_for_excel_upload",
url: "/superset/schemas_access_for_file_upload",
data: {db_id: db_id},
dataType: 'json',
contentType: "application/json; charset=utf-8"
Expand Down
6 changes: 3 additions & 3 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3072,11 +3072,11 @@ def sqllab_history(self) -> FlaskResponse:
@api
@has_access_api
@event_logger.log_this
@expose("/schemas_access_for_csv_upload")
def schemas_access_for_csv_upload(self) -> FlaskResponse:
@expose("/schemas_access_for_file_upload")
def schemas_access_for_file_upload(self) -> FlaskResponse:
"""
This method exposes an API endpoint to
get the schema access control settings for csv upload in this database
get the schema access control settings for file upload in this database
"""
if not request.args.get("db_id"):
return json_error_response("No database is allowed for your csv upload")
Expand Down
144 changes: 143 additions & 1 deletion superset/views/database/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from flask_appbuilder.forms import DynamicForm
from flask_babel import lazy_gettext as _
from flask_wtf.file import FileAllowed, FileField, FileRequired
from wtforms import BooleanField, IntegerField, SelectField, StringField
from wtforms import (
BooleanField,
IntegerField,
MultipleFileField,
SelectField,
StringField,
)
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from wtforms.validators import DataRequired, Length, NumberRange, Optional

Expand Down Expand Up @@ -163,6 +169,15 @@ def at_least_one_schema_is_allowed(database: Database) -> bool:
_("Mangle Duplicate Columns"),
description=_('Specify duplicate columns as "X.0, X.1".'),
)
usecols = JsonListField(
_("Use Columns"),
default=None,
description=_(
"Json list of the column names that should be read. "
"If not None, only these columns will be read from the file."
),
validators=[Optional()],
)
skipinitialspace = BooleanField(
_("Skip Initial Space"), description=_("Skip spaces after delimiter.")
)
Expand Down Expand Up @@ -402,3 +417,130 @@ def at_least_one_schema_is_allowed(database: Database) -> bool:
'Use [""] for empty string.'
),
)


class ColumnarToDatabaseForm(DynamicForm):
# pylint: disable=E0211
def columnar_allowed_dbs() -> List[Database]: # type: ignore
# TODO: change allow_csv_upload to allow_file_upload
columnar_enabled_dbs = (
db.session.query(Database).filter_by(allow_csv_upload=True).all()
)
return [
columnar_enabled_db
for columnar_enabled_db in columnar_enabled_dbs
if ColumnarToDatabaseForm.at_least_one_schema_is_allowed(
columnar_enabled_db
)
]

@staticmethod
def at_least_one_schema_is_allowed(database: Database) -> bool:
"""
If the user has access to the database or all datasource
1. if schemas_allowed_for_csv_upload is empty
a) if database does not support schema
user is able to upload columnar without specifying schema name
b) if database supports schema
user is able to upload columnar to any schema
2. if schemas_allowed_for_csv_upload is not empty
a) if database does not support schema
This situation is impossible and upload will fail
b) if database supports schema
user is able to upload to schema in schemas_allowed_for_csv_upload
elif the user does not access to the database or all datasource
1. if schemas_allowed_for_csv_upload is empty
a) if database does not support schema
user is unable to upload columnar
b) if database supports schema
user is unable to upload columnar
2. if schemas_allowed_for_csv_upload is not empty
a) if database does not support schema
This situation is impossible and user is unable to upload columnar
b) if database supports schema
user is able to upload to schema in schemas_allowed_for_csv_upload
"""
if security_manager.can_access_database(database):
return True
schemas = database.get_schema_access_for_csv_upload()
if schemas and security_manager.schemas_accessible_by_user(
database, schemas, False
):
return True
return False

name = StringField(
_("Table Name"),
description=_("Name of table to be created from columnar data."),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
)
columnar_file = MultipleFileField(
_("Columnar File"),
description=_("Select a Columnar file to be uploaded to a database."),
validators=[
DataRequired(),
FileAllowed(
config["ALLOWED_EXTENSIONS"].intersection(
config["COLUMNAR_EXTENSIONS"]
),
_(
"Only the following file extensions are allowed: "
"%(allowed_extensions)s",
allowed_extensions=", ".join(
config["ALLOWED_EXTENSIONS"].intersection(
config["COLUMNAR_EXTENSIONS"]
)
),
),
),
],
)

con = QuerySelectField(
_("Database"),
query_factory=columnar_allowed_dbs,
get_pk=lambda a: a.id,
get_label=lambda a: a.database_name,
)
schema = StringField(
_("Schema"),
description=_("Specify a schema (if database flavor supports this)."),
validators=[Optional()],
widget=BS3TextFieldWidget(),
)
if_exists = SelectField(
_("Table Exists"),
description=_(
"If table exists do one of the following: "
"Fail (do nothing), Replace (drop and recreate table) "
"or Append (insert data)."
),
choices=[
("fail", _("Fail")),
("replace", _("Replace")),
("append", _("Append")),
],
validators=[DataRequired()],
)
usecols = JsonListField(
_("Use Columns"),
default=None,
description=_(
"Json list of the column names that should be read. "
"If not None, only these columns will be read from the file."
),
validators=[Optional()],
)
index = BooleanField(
_("Dataframe Index"), description=_("Write dataframe index as a column.")
)
index_label = StringField(
_("Column Label(s)"),
description=_(
"Column label for index column(s). If None is given "
"and Dataframe Index is True, Index Names are used."
),
validators=[Optional()],
widget=BS3TextFieldWidget(),
)

0 comments on commit d25b096

Please sign in to comment.