Skip to content

Commit

Permalink
feat: add replace option to hive csv upload
Browse files Browse the repository at this point in the history
  • Loading branch information
erik_ritter committed Jun 9, 2020
1 parent 526ee3c commit b0bca50
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
21 changes: 20 additions & 1 deletion superset/db_engine_specs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from superset import app, cache, conf
from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.presto import PrestoEngineSpec
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query
from superset.sql_parse import Table
from superset.utils import core as utils
Expand Down Expand Up @@ -116,6 +117,10 @@ def create_table_from_csv( # pylint: disable=too-many-arguments, too-many-local
) -> None:
"""Uploads a csv file and creates a superset datasource in Hive."""

if_exists = df_to_sql_kwargs["if_exists"]
if if_exists == "append":
raise SupersetException("Append operation not currently supported")

def convert_to_hive_type(col_type: str) -> str:
"""maps tableschema's types to hive types"""
tableschema_to_hive_types = {
Expand Down Expand Up @@ -153,6 +158,20 @@ def convert_to_hive_type(col_type: str) -> str:
)
schema_definition = ", ".join(column_name_and_type)

# ensure table doesn't already exist
if (
if_exists == "fail"
and not database.get_df(
f"SHOW TABLES IN {table.schema} LIKE '{table.table}'"
).empty
):
raise SupersetException("Table already exists")

engine = cls.get_engine(database)

if if_exists == "replace":
engine.execute(f"DROP TABLE IF EXISTS {str(table)}")

# Optional dependency
import boto3 # pylint: disable=import-error

Expand All @@ -163,12 +182,12 @@ def convert_to_hive_type(col_type: str) -> str:
bucket_path,
os.path.join(upload_prefix, table.table, os.path.basename(filename)),
)

# TODO(bkyryliuk): support other delimiters
sql = f"""CREATE TABLE {str(table)} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
TEXTFILE LOCATION '{location}'
tblproperties ('skip.header.line.count'='1')"""
engine = cls.get_engine(database)
engine.execute(sql)

@classmethod
Expand Down
13 changes: 13 additions & 0 deletions tests/db_engine_specs/hive_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from unittest import mock

from superset.db_engine_specs.hive import HiveEngineSpec
from superset.exceptions import SupersetException
from superset.sql_parse import Table
from tests.db_engine_specs.base_tests import DbEngineSpecTestCase


Expand Down Expand Up @@ -162,3 +164,14 @@ def test_convert_dttm(self):
HiveEngineSpec.convert_dttm("TIMESTAMP", dttm),
"CAST('2019-01-02 03:04:05.678900' AS TIMESTAMP)",
)

def test_create_table_from_csv_append(self) -> None:
self.assertRaises(
SupersetException,
HiveEngineSpec.create_table_from_csv,
"foo.csv",
Table("foobar"),
None,
{},
{"if_exists": "append"},
)

0 comments on commit b0bca50

Please sign in to comment.