Skip to content

Commit

Permalink
Import CSV (#3643)
Browse files Browse the repository at this point in the history
* add upload csv button to sources dropdown

* upload csv to non-hive datasources

* upload csv to hive datasource

* update FAQ page

* add tests

* fix linting errors and merge conflicts

* Update .travis.yml

* Update tox.ini
  • Loading branch information
timifasubaa authored and mistercrunch committed Nov 28, 2017
1 parent c5ddf57 commit 268edcf
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 5 deletions.
7 changes: 7 additions & 0 deletions docs/faq.rst
Expand Up @@ -45,6 +45,13 @@ visualizations.
https://github.com/airbnb/superset/issues?q=label%3Aexample+is%3Aclosed


Can I upload and visualize csv data?
-------------------------------------

Yes, using the ``Upload a CSV`` button under the ``Sources``
menu item. This brings up a form that allows you specify required information. After creating the table from CSV, it can then be loadede like any other on the ``Sources -> Tables``page.
Why are my queries timing out?
------------------------------
Expand Down
16 changes: 14 additions & 2 deletions superset/config.py
Expand Up @@ -58,9 +58,9 @@
SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h' # noqa

# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'

# In order to hook up a custom password store for all SQLACHEMY connections
# implement a function that takes a single argument of type 'sqla.engine.url',
Expand Down Expand Up @@ -188,6 +188,10 @@
ENABLE_CORS = False
CORS_OPTIONS = {}

# Allowed format types for upload on Database view
# TODO: Add processing of other spreadsheet formats (xls, xlsx etc)
ALLOWED_EXTENSIONS = set(['csv'])

# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv method
# note: index option should not be overridden
CSV_EXPORT = {
Expand Down Expand Up @@ -298,6 +302,14 @@ class CeleryConfig(object):
# in SQL Lab by using the "Run Async" button/feature
RESULTS_BACKEND = None

# The S3 bucket where you want to store your external hive tables created
# from CSV files. For example, 'companyname-superset'
CSV_TO_HIVE_UPLOAD_S3_BUCKET = None

# The directory within the bucket specified above that will
# contain all the external tables
CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'

# A dictionary of items that gets merged into the Jinja context for
# SQL Lab. The existing context gets updated with this dictionary,
# meaning values for existing keys get overwritten by the content of this
Expand Down
109 changes: 108 additions & 1 deletion superset/db_engine_specs.py
Expand Up @@ -17,21 +17,30 @@
from __future__ import unicode_literals

from collections import defaultdict, namedtuple
import csv
import inspect
import logging
import os
import re
import textwrap
import time

import boto3
from flask import g
from flask_babel import lazy_gettext as _
import pandas
from sqlalchemy import select
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import text
import sqlparse
from werkzeug.utils import secure_filename

from superset import cache_util, conf, utils
from superset import app, cache_util, conf, db, utils
from superset.utils import QueryStatus, SupersetTemplateException

config = app.config

tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')

Grain = namedtuple('Grain', 'name label function')
Expand Down Expand Up @@ -73,6 +82,65 @@ def extra_table_metadata(cls, database, table_name, schema_name):
"""Returns engine-specific table metadata"""
return {}

@staticmethod
def csv_to_df(**kwargs):
kwargs['filepath_or_buffer'] = \
app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
kwargs['encoding'] = 'utf-8'
kwargs['iterator'] = True
chunks = pandas.read_csv(**kwargs)
df = pandas.DataFrame()
df = pandas.concat(chunk for chunk in chunks)
return df

@staticmethod
def df_to_db(df, table, **kwargs):
df.to_sql(**kwargs)
table.user_id = g.user.id
table.schema = kwargs['schema']
table.fetch_metadata()
db.session.add(table)
db.session.commit()

@staticmethod
def create_table_from_csv(form, table):
def _allowed_file(filename):
# Only allow specific file extensions as specified in the config
extension = os.path.splitext(filename)[1]
return extension and extension[1:] in app.config['ALLOWED_EXTENSIONS']

filename = secure_filename(form.csv_file.data.filename)
if not _allowed_file(filename):
raise Exception('Invalid file type selected')
kwargs = {
'filepath_or_buffer': filename,
'sep': form.sep.data,
'header': form.header.data if form.header.data else 0,
'index_col': form.index_col.data,
'mangle_dupe_cols': form.mangle_dupe_cols.data,
'skipinitialspace': form.skipinitialspace.data,
'skiprows': form.skiprows.data,
'nrows': form.nrows.data,
'skip_blank_lines': form.skip_blank_lines.data,
'parse_dates': form.parse_dates.data,
'infer_datetime_format': form.infer_datetime_format.data,
'chunksize': 10000,
}
df = BaseEngineSpec.csv_to_df(**kwargs)

df_to_db_kwargs = {
'table': table,
'df': df,
'name': form.name.data,
'con': create_engine(form.con.data, echo=False),
'schema': form.schema.data,
'if_exists': form.if_exists.data,
'index': form.index.data,
'index_label': form.index_label.data,
'chunksize': 10000,
}
BaseEngineSpec.df_to_db(**df_to_db_kwargs)

@classmethod
def escape_sql(cls, sql):
"""Escapes the raw SQL"""
Expand Down Expand Up @@ -721,6 +789,45 @@ def fetch_result_sets(cls, db, datasource_type, force=False):
return BaseEngineSpec.fetch_result_sets(
db, datasource_type, force=force)

@staticmethod
def create_table_from_csv(form, table):
"""Uploads a csv file and creates a superset datasource in Hive."""
def get_column_names(filepath):
with open(filepath, 'rb') as f:
return csv.reader(f).next()

table_name = form.name.data
filename = form.csv_file.data.filename

bucket_path = app.config['CSV_TO_HIVE_UPLOAD_BUCKET']

if not bucket_path:
logging.info('No upload bucket specified')
raise Exception(
'No upload bucket specified. You can specify one in the config file.')

upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
dest_path = os.path.join(table_name, filename)

upload_path = app.config['UPLOAD_FOLDER'] + \
secure_filename(form.csv_file.data.filename)
column_names = get_column_names(upload_path)
schema_definition = ', '.join(
[s + ' STRING ' for s in column_names])

s3 = boto3.client('s3')
location = os.path.join('s3a://', bucket_path, upload_prefix, table_name)
s3.upload_file(
upload_path, 'airbnb-superset',
os.path.join(upload_prefix, table_name, filename))
sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
TEXTFILE LOCATION '{location}'""".format(**locals())

logging.info(form.con.data)
engine = create_engine(form.con.data)
engine.execute(sql)

@classmethod
def convert_dttm(cls, target_type, dttm):
tt = target_type.upper()
Expand Down
123 changes: 123 additions & 0 deletions superset/forms.py
@@ -0,0 +1,123 @@
"""Contains the logic to create cohesive forms on the explore view"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_appbuilder.forms import DynamicForm
from flask_babel import lazy_gettext as _
from flask_wtf.file import FileAllowed, FileField, FileRequired
from wtforms import (
BooleanField, IntegerField, SelectField, StringField)
from wtforms.validators import DataRequired, NumberRange, Optional

from superset import app

config = app.config


class CsvToDatabaseForm(DynamicForm):
name = StringField(
_('Table Name'),
description=_('Name of table to be created from csv data.'),
validators=[DataRequired()],
widget=BS3TextFieldWidget())
csv_file = FileField(
_('CSV File'),
description=_('Select a CSV file to be uploaded to a database.'),
validators=[
FileRequired(), FileAllowed(['csv'], _('CSV Files Only!'))])

con = SelectField(
_('Database'),
description=_('database in which to add above table.'),
validators=[DataRequired()],
choices=[])
sep = StringField(
_('Delimiter'),
description=_('Delimiter used by CSV file (for whitespace use \s+).'),
validators=[DataRequired()],
widget=BS3TextFieldWidget())
if_exists = SelectField(
_('Table Exists'),
description=_(
'If table exists do one of the following: '
'Fail (do nothing), Replace (drop and recreate table) '
'or Append (insert data).'),
choices=[
('fail', _('Fail')), ('replace', _('Replace')),
('append', _('Append'))],
validators=[DataRequired()])

schema = StringField(
_('Schema'),
description=_('Specify a schema (if database flavour supports this).'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
header = IntegerField(
_('Header Row'),
description=_(
'Row containing the headers to use as '
'column names (0 is first line of data). '
'Leave empty if there is no header row.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
index_col = IntegerField(
_('Index Column'),
description=_(
'Column to use as the row labels of the '
'dataframe. Leave empty if no index column.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
mangle_dupe_cols = BooleanField(
_('Mangle Duplicate Columns'),
description=_('Specify duplicate columns as "X.0, X.1".'))
skipinitialspace = BooleanField(
_('Skip Initial Space'),
description=_('Skip spaces after delimiter.'))
skiprows = IntegerField(
_('Skip Rows'),
description=_('Number of rows to skip at start of file.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
nrows = IntegerField(
_('Rows to Read'),
description=_('Number of rows of file to read.'),
validators=[Optional(), NumberRange(0, 1E+20)],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])
skip_blank_lines = BooleanField(
_('Skip Blank Lines'),
description=_(
'Skip blank lines rather than interpreting them '
'as NaN values.'))
parse_dates = BooleanField(
_('Parse Dates'),
description=_('Parse date values.'))
infer_datetime_format = BooleanField(
_('Infer Datetime Format'),
description=_(
'Use Pandas to interpret the datetime format '
'automatically.'))
decimal = StringField(
_('Decimal Character'),
description=_('Character to interpret as decimal point.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or '.'])
index = BooleanField(
_('Dataframe Index'),
description=_('Write dataframe index as a column.'))
index_label = StringField(
_('Column Label(s)'),
description=_(
'Column label for index column(s). If None is given '
'and Dataframe Index is True, Index Names are used.'),
validators=[Optional()],
widget=BS3TextFieldWidget(),
filters=[lambda x: x or None])

0 comments on commit 268edcf

Please sign in to comment.