Skip to content

Commit

Permalink
chore: clean up csv tests (apache#10556)
Browse files Browse the repository at this point in the history
* Clean up csv tests

* Update tests/base_tests.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update tests/base_tests.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* import optional

* Fix mypy error

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>
  • Loading branch information
3 people authored and auxten committed Nov 20, 2020
1 parent 2171e5d commit 379f4af
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 230 deletions.
2 changes: 1 addition & 1 deletion superset/tasks/slack_util.py
Expand Up @@ -26,7 +26,7 @@
from superset import app

# Globals
config = app.config # type: ignore
config = app.config
logger = logging.getLogger("tasks.slack_util")


Expand Down
4 changes: 4 additions & 0 deletions superset/utils/core.py
Expand Up @@ -1029,6 +1029,10 @@ def get_main_database() -> "Database":
return get_or_create_db("main", db_uri)


def backend() -> str:
return get_example_database().backend


def is_adhoc_metric(metric: Metric) -> bool:
return bool(
isinstance(metric, dict)
Expand Down
41 changes: 28 additions & 13 deletions tests/base_tests.py
Expand Up @@ -18,7 +18,7 @@
"""Unit tests for Superset"""
import imp
import json
from typing import Any, Dict, Union, List
from typing import Any, Dict, Union, List, Optional
from unittest.mock import Mock, patch

import pandas as pd
Expand All @@ -44,6 +44,31 @@
FAKE_DB_NAME = "fake_db_100"


def login(client: Any, username: str = "admin", password: str = "general"):
resp = get_resp(client, "/login/", data=dict(username=username, password=password))
assert "User confirmation needed" not in resp


def get_resp(
client: Any,
url: str,
data: Any = None,
follow_redirects: bool = True,
raise_on_error: bool = True,
json_: Optional[str] = None,
):
"""Shortcut to get the parsed results while following redirects"""
if data:
resp = client.post(url, data=data, follow_redirects=follow_redirects)
elif json_:
resp = client.post(url, json=json_, follow_redirects=follow_redirects)
else:
resp = client.get(url, follow_redirects=follow_redirects)
if raise_on_error and resp.status_code > 400:
raise Exception("http request failed with code {}".format(resp.status_code))
return resp.data.decode("utf-8")


class SupersetTestCase(TestCase):

default_schema_backend_map = {
Expand Down Expand Up @@ -145,8 +170,7 @@ def get_or_create(self, cls, criteria, session, **kwargs):
return obj

def login(self, username="admin", password="general"):
resp = self.get_resp("/login/", data=dict(username=username, password=password))
self.assertNotIn("User confirmation needed", resp)
return login(self.client, username, password)

def get_slice(
self, slice_name: str, session: Session, expunge_from_session: bool = True
Expand Down Expand Up @@ -189,16 +213,7 @@ def get_datasource_mock() -> BaseDatasource:
def get_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True, json_=None
):
"""Shortcut to get the parsed results while following redirects"""
if data:
resp = self.client.post(url, data=data, follow_redirects=follow_redirects)
elif json_:
resp = self.client.post(url, json=json_, follow_redirects=follow_redirects)
else:
resp = self.client.get(url, follow_redirects=follow_redirects)
if raise_on_error and resp.status_code > 400:
raise Exception("http request failed with code {}".format(resp.status_code))
return resp.data.decode("utf-8")
return get_resp(self.client, url, data, follow_redirects, raise_on_error, json_)

def get_json_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True, json_=None
Expand Down
216 changes: 0 additions & 216 deletions tests/core_tests.py
Expand Up @@ -845,222 +845,6 @@ def enable_csv_upload(self, database: models.Database) -> None:
form_get = self.get_resp("/csvtodatabaseview/form")
self.assertIn("CSV to Database configuration", form_get)

def upload_csv(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"csv_file": open(filename, "rb"),
"sep": ",",
"name": table_name,
"con": utils.get_example_database().id,
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/csvtodatabaseview/form", data=form_data)

def upload_excel(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"excel_file": open(filename, "rb"),
"name": table_name,
"con": utils.get_example_database().id,
"sheet_name": "Sheet1",
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/exceltodatabaseview/form", data=form_data)

@mock.patch(
"superset.models.core.config",
{**app.config, "ALLOWED_USER_CSV_SCHEMA_FUNC": lambda d, u: ["admin_database"]},
)
def test_import_csv_enforced_schema(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
full_table_name = f"admin_database.{table_name}"
filename = "testCSV.csv"
self.create_sample_csvfile(filename, ["a,b", "john,1", "paul,2"])
try:
self.enable_csv_upload(utils.get_example_database())

# no schema specified, fail upload
resp = self.upload_csv(filename, table_name)
self.assertIn(
'Database "examples" schema "None" is not allowed for csv uploads', resp
)

# user specified schema matches the expected schema, append
success_msg = f'CSV file "{filename}" uploaded to table "{full_table_name}"'
resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "append"},
)
self.assertIn(success_msg, resp)

resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "replace"},
)
self.assertIn(success_msg, resp)

# user specified schema doesn't match, fail
resp = self.upload_csv(filename, table_name, extra={"schema": "gold"})
self.assertIn(
'Database "examples" schema "gold" is not allowed for csv uploads',
resp,
)
finally:
os.remove(filename)

def test_import_csv_explore_database(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
explore_db_id = utils.get_example_database().id

upload_db = utils.get_or_create_db(
"csv_explore_db", app.config["SQLALCHEMY_EXAMPLES_URI"]
)
upload_db_id = upload_db.id
extra = upload_db.get_extra()
extra["explore_database_id"] = explore_db_id
upload_db.extra = json.dumps(extra)
db.session.commit()

self.login(username="admin")
self.enable_csv_upload(DatasetDAO.get_database_by_id(upload_db_id))
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))

f = "testCSV.csv"
self.create_sample_csvfile(f, ["a,b", "john,1", "paul,2"])
# initial upload with fail mode
resp = self.upload_csv(f, table_name)
self.assertIn(f'CSV file "{f}" uploaded to table "{table_name}"', resp)
table = self.get_table_by_name(table_name)
self.assertEqual(table.database_id, explore_db_id)

# cleanup
db.session.delete(table)
db.session.delete(DatasetDAO.get_database_by_id(upload_db_id))
db.session.commit()
os.remove(f)

def test_import_csv(self):
self.login(username="admin")
examples_db = utils.get_example_database()
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))

f1 = "testCSV.csv"
self.create_sample_csvfile(f1, ["a,b", "john,1", "paul,2"])
f2 = "testCSV2.csv"
self.create_sample_csvfile(f2, ["b,c,d", "john,1,x", "paul,2,"])
self.enable_csv_upload(examples_db)

try:
success_msg_f1 = f'CSV file "{f1}" uploaded to table "{table_name}"'

# initial upload with fail mode
resp = self.upload_csv(f1, table_name)
self.assertIn(success_msg_f1, resp)

# upload again with fail mode; should fail
fail_msg = f'Unable to upload CSV file "{f1}" to table "{table_name}"'
resp = self.upload_csv(f1, table_name)
self.assertIn(fail_msg, resp)

# upload again with append mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)

# upload again with replace mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)

# try to append to table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "append"})
fail_msg_f2 = f'Unable to upload CSV file "{f2}" to table "{table_name}"'
self.assertIn(fail_msg_f2, resp)

# replace table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
success_msg_f2 = f'CSV file "{f2}" uploaded to table "{table_name}"'
self.assertIn(success_msg_f2, resp)

table = self.get_table_by_name(table_name)
# make sure the new column name is reflected in the table metadata
self.assertIn("d", table.column_names)

# null values are set
self.upload_csv(
f2,
table_name,
extra={"null_values": '["", "john"]', "if_exists": "replace"},
)
# make sure that john and empty string are replaced with None
engine = examples_db.get_sqla_engine()
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [(None, 1, "x"), ("paul", 2, None)]

# default null values
self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
# make sure that john and empty string are replaced with None
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [("john", 1, "x"), ("paul", 2, None)]

finally:
os.remove(f1)
os.remove(f2)

def test_import_excel(self):
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f1 = "testExcel.xlsx"
self.create_sample_excelfile(f1, {"a": ["john", "paul"], "b": [1, 2]})
self.enable_csv_upload(utils.get_example_database())

try:
success_msg_f1 = f'Excel file "{f1}" uploaded to table "{table_name}"'

# initial upload with fail mode
resp = self.upload_excel(f1, table_name)
self.assertIn(success_msg_f1, resp)

# upload again with fail mode; should fail
fail_msg = f'Unable to upload Excel file "{f1}" to table "{table_name}"'
resp = self.upload_excel(f1, table_name)
self.assertIn(fail_msg, resp)

# upload again with append mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)

# upload again with replace mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)

# make sure that john and empty string are replaced with None
data = (
utils.get_example_database()
.get_sqla_engine()
.execute(f"SELECT * from {table_name}")
.fetchall()
)
assert data == [(0, "john", 1), (1, "paul", 2)]
finally:
os.remove(f1)

def test_dataframe_timezone(self):
tz = pytz.FixedOffset(60)
data = [
Expand Down

0 comments on commit 379f4af

Please sign in to comment.