Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clean up csv tests #10556

Merged
merged 5 commits into from Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion superset/tasks/slack_util.py
Expand Up @@ -26,7 +26,7 @@
from superset import app

# Globals
config = app.config # type: ignore
config = app.config
logger = logging.getLogger("tasks.slack_util")


Expand Down
4 changes: 4 additions & 0 deletions superset/utils/core.py
Expand Up @@ -1029,6 +1029,10 @@ def get_main_database() -> "Database":
return get_or_create_db("main", db_uri)


def backend() -> str:
return get_example_database().backend


def is_adhoc_metric(metric: Metric) -> bool:
return bool(
isinstance(metric, dict)
Expand Down
41 changes: 28 additions & 13 deletions tests/base_tests.py
Expand Up @@ -18,7 +18,7 @@
"""Unit tests for Superset"""
import imp
import json
from typing import Any, Dict, Union, List
from typing import Any, Dict, Union, List, Optional
from unittest.mock import Mock, patch

import pandas as pd
Expand All @@ -44,6 +44,31 @@
FAKE_DB_NAME = "fake_db_100"


def login(client: Any, username: str = "admin", password: str = "general"):
resp = get_resp(client, "/login/", data=dict(username=username, password=password))
assert "User confirmation needed" not in resp


def get_resp(
client: Any,
url: str,
data: Any = None,
follow_redirects: bool = True,
raise_on_error: bool = True,
json_: Optional[str] = None,
):
"""Shortcut to get the parsed results while following redirects"""
if data:
resp = client.post(url, data=data, follow_redirects=follow_redirects)
elif json_:
resp = client.post(url, json=json_, follow_redirects=follow_redirects)
else:
resp = client.get(url, follow_redirects=follow_redirects)
if raise_on_error and resp.status_code > 400:
raise Exception("http request failed with code {}".format(resp.status_code))
return resp.data.decode("utf-8")


class SupersetTestCase(TestCase):

default_schema_backend_map = {
Expand Down Expand Up @@ -145,8 +170,7 @@ def get_or_create(self, cls, criteria, session, **kwargs):
return obj

def login(self, username="admin", password="general"):
resp = self.get_resp("/login/", data=dict(username=username, password=password))
self.assertNotIn("User confirmation needed", resp)
return login(self.client, username, password)

def get_slice(
self, slice_name: str, session: Session, expunge_from_session: bool = True
Expand Down Expand Up @@ -189,16 +213,7 @@ def get_datasource_mock() -> BaseDatasource:
def get_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True, json_=None
):
"""Shortcut to get the parsed results while following redirects"""
if data:
resp = self.client.post(url, data=data, follow_redirects=follow_redirects)
elif json_:
resp = self.client.post(url, json=json_, follow_redirects=follow_redirects)
else:
resp = self.client.get(url, follow_redirects=follow_redirects)
if raise_on_error and resp.status_code > 400:
raise Exception("http request failed with code {}".format(resp.status_code))
return resp.data.decode("utf-8")
return get_resp(self.client, url, data, follow_redirects, raise_on_error, json_)

def get_json_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True, json_=None
Expand Down
216 changes: 0 additions & 216 deletions tests/core_tests.py
Expand Up @@ -845,222 +845,6 @@ def enable_csv_upload(self, database: models.Database) -> None:
form_get = self.get_resp("/csvtodatabaseview/form")
self.assertIn("CSV to Database configuration", form_get)

def upload_csv(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"csv_file": open(filename, "rb"),
"sep": ",",
"name": table_name,
"con": utils.get_example_database().id,
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/csvtodatabaseview/form", data=form_data)

def upload_excel(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"excel_file": open(filename, "rb"),
"name": table_name,
"con": utils.get_example_database().id,
"sheet_name": "Sheet1",
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/exceltodatabaseview/form", data=form_data)

@mock.patch(
"superset.models.core.config",
{**app.config, "ALLOWED_USER_CSV_SCHEMA_FUNC": lambda d, u: ["admin_database"]},
)
def test_import_csv_enforced_schema(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
full_table_name = f"admin_database.{table_name}"
filename = "testCSV.csv"
self.create_sample_csvfile(filename, ["a,b", "john,1", "paul,2"])
try:
self.enable_csv_upload(utils.get_example_database())

# no schema specified, fail upload
resp = self.upload_csv(filename, table_name)
self.assertIn(
'Database "examples" schema "None" is not allowed for csv uploads', resp
)

# user specified schema matches the expected schema, append
success_msg = f'CSV file "{filename}" uploaded to table "{full_table_name}"'
resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "append"},
)
self.assertIn(success_msg, resp)

resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "replace"},
)
self.assertIn(success_msg, resp)

# user specified schema doesn't match, fail
resp = self.upload_csv(filename, table_name, extra={"schema": "gold"})
self.assertIn(
'Database "examples" schema "gold" is not allowed for csv uploads',
resp,
)
finally:
os.remove(filename)

def test_import_csv_explore_database(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
explore_db_id = utils.get_example_database().id

upload_db = utils.get_or_create_db(
"csv_explore_db", app.config["SQLALCHEMY_EXAMPLES_URI"]
)
upload_db_id = upload_db.id
extra = upload_db.get_extra()
extra["explore_database_id"] = explore_db_id
upload_db.extra = json.dumps(extra)
db.session.commit()

self.login(username="admin")
self.enable_csv_upload(DatasetDAO.get_database_by_id(upload_db_id))
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))

f = "testCSV.csv"
self.create_sample_csvfile(f, ["a,b", "john,1", "paul,2"])
# initial upload with fail mode
resp = self.upload_csv(f, table_name)
self.assertIn(f'CSV file "{f}" uploaded to table "{table_name}"', resp)
table = self.get_table_by_name(table_name)
self.assertEqual(table.database_id, explore_db_id)

# cleanup
db.session.delete(table)
db.session.delete(DatasetDAO.get_database_by_id(upload_db_id))
db.session.commit()
os.remove(f)

def test_import_csv(self):
self.login(username="admin")
examples_db = utils.get_example_database()
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))

f1 = "testCSV.csv"
self.create_sample_csvfile(f1, ["a,b", "john,1", "paul,2"])
f2 = "testCSV2.csv"
self.create_sample_csvfile(f2, ["b,c,d", "john,1,x", "paul,2,"])
self.enable_csv_upload(examples_db)

try:
success_msg_f1 = f'CSV file "{f1}" uploaded to table "{table_name}"'

# initial upload with fail mode
resp = self.upload_csv(f1, table_name)
self.assertIn(success_msg_f1, resp)

# upload again with fail mode; should fail
fail_msg = f'Unable to upload CSV file "{f1}" to table "{table_name}"'
resp = self.upload_csv(f1, table_name)
self.assertIn(fail_msg, resp)

# upload again with append mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)

# upload again with replace mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)

# try to append to table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "append"})
fail_msg_f2 = f'Unable to upload CSV file "{f2}" to table "{table_name}"'
self.assertIn(fail_msg_f2, resp)

# replace table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
success_msg_f2 = f'CSV file "{f2}" uploaded to table "{table_name}"'
self.assertIn(success_msg_f2, resp)

table = self.get_table_by_name(table_name)
# make sure the new column name is reflected in the table metadata
self.assertIn("d", table.column_names)

# null values are set
self.upload_csv(
f2,
table_name,
extra={"null_values": '["", "john"]', "if_exists": "replace"},
)
# make sure that john and empty string are replaced with None
engine = examples_db.get_sqla_engine()
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [(None, 1, "x"), ("paul", 2, None)]

# default null values
self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
# make sure that john and empty string are replaced with None
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [("john", 1, "x"), ("paul", 2, None)]

finally:
os.remove(f1)
os.remove(f2)

def test_import_excel(self):
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f1 = "testExcel.xlsx"
self.create_sample_excelfile(f1, {"a": ["john", "paul"], "b": [1, 2]})
self.enable_csv_upload(utils.get_example_database())

try:
success_msg_f1 = f'Excel file "{f1}" uploaded to table "{table_name}"'

# initial upload with fail mode
resp = self.upload_excel(f1, table_name)
self.assertIn(success_msg_f1, resp)

# upload again with fail mode; should fail
fail_msg = f'Unable to upload Excel file "{f1}" to table "{table_name}"'
resp = self.upload_excel(f1, table_name)
self.assertIn(fail_msg, resp)

# upload again with append mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)

# upload again with replace mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)

# make sure that john and empty string are replaced with None
data = (
utils.get_example_database()
.get_sqla_engine()
.execute(f"SELECT * from {table_name}")
.fetchall()
)
assert data == [(0, "john", 1), (1, "paul", 2)]
finally:
os.remove(f1)

def test_dataframe_timezone(self):
tz = pytz.FixedOffset(60)
data = [
Expand Down