Skip to content

Commit

Permalink
Refactoring Druid & SQLa into a proper "Connector" interface (#2362)
Browse files Browse the repository at this point in the history
* Formalizing the Connector interface

* Checkpoint

* Fixing views

* Fixing tests

* Adding migrtion

* Tests

* Final

* Addressing comments
  • Loading branch information
mistercrunch committed Mar 10, 2017
1 parent 9a8c3a0 commit 2969cc9
Show file tree
Hide file tree
Showing 32 changed files with 3,781 additions and 3,573 deletions.
8 changes: 4 additions & 4 deletions superset/__init__.py
Expand Up @@ -13,9 +13,9 @@
from flask_appbuilder import SQLA, AppBuilder, IndexView
from flask_appbuilder.baseviews import expose
from flask_migrate import Migrate
from superset.source_registry import SourceRegistry
from superset.connectors.connector_registry import ConnectorRegistry
from werkzeug.contrib.fixers import ProxyFix
from superset import utils
from superset import utils, config # noqa


APP_DIR = os.path.dirname(__file__)
Expand Down Expand Up @@ -104,6 +104,6 @@ def index(self):
# Registering sources
module_datasource_map = app.config.get("DEFAULT_MODULE_DS_MAP")
module_datasource_map.update(app.config.get("ADDITIONAL_MODULE_DS_MAP"))
SourceRegistry.register_sources(module_datasource_map)
ConnectorRegistry.register_sources(module_datasource_map)

from superset import views, config # noqa
from superset import views # noqa
3 changes: 2 additions & 1 deletion superset/cli.py
Expand Up @@ -13,7 +13,7 @@
from flask_migrate import MigrateCommand
from flask_script import Manager

from superset import app, db, data, security
from superset import app, db, security

config = app.config

Expand Down Expand Up @@ -89,6 +89,7 @@ def version(verbose):
help="Load additional test data")
def load_examples(load_test_data):
"""Loads a set of Slices and Dashboards and a supporting dataset """
from superset import data
print("Loading examples into {}".format(db))

data.load_css_templates()
Expand Down
18 changes: 12 additions & 6 deletions superset/config.py
Expand Up @@ -178,7 +178,10 @@
# --------------------------------------------------
# Modules, datasources and middleware to be registered
# --------------------------------------------------
DEFAULT_MODULE_DS_MAP = {'superset.models': ['DruidDatasource', 'SqlaTable']}
DEFAULT_MODULE_DS_MAP = {
'superset.connectors.druid.models': ['DruidDatasource'],
'superset.connectors.sqla.models': ['SqlaTable'],
}
ADDITIONAL_MODULE_DS_MAP = {}
ADDITIONAL_MIDDLEWARE = []

Expand Down Expand Up @@ -292,14 +295,17 @@ class CeleryConfig(object):
BLUEPRINTS = []

try:

if CONFIG_PATH_ENV_VAR in os.environ:
# Explicitly import config module that is not in pythonpath; useful
# for case where app is being executed via pex.
print('Loaded your LOCAL configuration at [{}]'.format(
os.environ[CONFIG_PATH_ENV_VAR]))
imp.load_source('superset_config', os.environ[CONFIG_PATH_ENV_VAR])

from superset_config import * # noqa
import superset_config
print('Loaded your LOCAL configuration at [{}]'.format(
superset_config.__file__))
else:
from superset_config import * # noqa
import superset_config
print('Loaded your LOCAL configuration at [{}]'.format(
superset_config.__file__))
except ImportError:
pass
Empty file added superset/connectors/__init__.py
Empty file.
160 changes: 160 additions & 0 deletions superset/connectors/base.py
@@ -0,0 +1,160 @@
import json

from sqlalchemy import Column, Integer, String, Text, Boolean

from superset import utils
from superset.models.helpers import AuditMixinNullable, ImportMixin


class BaseDatasource(AuditMixinNullable, ImportMixin):

"""A common interface to objects that are queryable (tables and datasources)"""

__tablename__ = None # {connector_name}_datasource

# Used to do code highlighting when displaying the query in the UI
query_language = None

@property
def column_names(self):
return sorted([c.column_name for c in self.columns])

@property
def main_dttm_col(self):
return "timestamp"

@property
def groupby_column_names(self):
return sorted([c.column_name for c in self.columns if c.groupby])

@property
def filterable_column_names(self):
return sorted([c.column_name for c in self.columns if c.filterable])

@property
def dttm_cols(self):
return []

@property
def url(self):
return '/{}/edit/{}'.format(self.baselink, self.id)

@property
def explore_url(self):
if self.default_endpoint:
return self.default_endpoint
else:
return "/superset/explore/{obj.type}/{obj.id}/".format(obj=self)

@property
def column_formats(self):
return {
m.metric_name: m.d3format
for m in self.metrics
if m.d3format
}

@property
def data(self):
"""Data representation of the datasource sent to the frontend"""
order_by_choices = []
for s in sorted(self.column_names):
order_by_choices.append((json.dumps([s, True]), s + ' [asc]'))
order_by_choices.append((json.dumps([s, False]), s + ' [desc]'))

d = {
'all_cols': utils.choicify(self.column_names),
'column_formats': self.column_formats,
'edit_url': self.url,
'filter_select': self.filter_select_enabled,
'filterable_cols': utils.choicify(self.filterable_column_names),
'gb_cols': utils.choicify(self.groupby_column_names),
'id': self.id,
'metrics_combo': self.metrics_combo,
'name': self.name,
'order_by_choices': order_by_choices,
'type': self.type,
}

# TODO move this block to SqlaTable.data
if self.type == 'table':
grains = self.database.grains() or []
if grains:
grains = [(g.name, g.name) for g in grains]
d['granularity_sqla'] = utils.choicify(self.dttm_cols)
d['time_grain_sqla'] = grains
return d


class BaseColumn(AuditMixinNullable, ImportMixin):
"""Interface for column"""

__tablename__ = None # {connector_name}_column

id = Column(Integer, primary_key=True)
column_name = Column(String(255))
verbose_name = Column(String(1024))
is_active = Column(Boolean, default=True)
type = Column(String(32))
groupby = Column(Boolean, default=False)
count_distinct = Column(Boolean, default=False)
sum = Column(Boolean, default=False)
avg = Column(Boolean, default=False)
max = Column(Boolean, default=False)
min = Column(Boolean, default=False)
filterable = Column(Boolean, default=False)
description = Column(Text)

# [optional] Set this to support import/export functionality
export_fields = []

def __repr__(self):
return self.column_name

num_types = ('DOUBLE', 'FLOAT', 'INT', 'BIGINT', 'LONG', 'REAL', 'NUMERIC')
date_types = ('DATE', 'TIME', 'DATETIME')
str_types = ('VARCHAR', 'STRING', 'CHAR')

@property
def is_num(self):
return any([t in self.type.upper() for t in self.num_types])

@property
def is_time(self):
return any([t in self.type.upper() for t in self.date_types])

@property
def is_string(self):
return any([t in self.type.upper() for t in self.str_types])


class BaseMetric(AuditMixinNullable, ImportMixin):

"""Interface for Metrics"""

__tablename__ = None # {connector_name}_metric

id = Column(Integer, primary_key=True)
metric_name = Column(String(512))
verbose_name = Column(String(1024))
metric_type = Column(String(32))
description = Column(Text)
is_restricted = Column(Boolean, default=False, nullable=True)
d3format = Column(String(128))

"""
The interface should also declare a datasource relationship pointing
to a derivative of BaseDatasource, along with a FK
datasource_name = Column(
String(255),
ForeignKey('datasources.datasource_name'))
datasource = relationship(
# needs to be altered to point to {Connector}Datasource
'BaseDatasource',
backref=backref('metrics', cascade='all, delete-orphan'),
enable_typechecks=False)
"""
@property
def perm(self):
raise NotImplementedError()
@@ -1,7 +1,7 @@
from sqlalchemy.orm import subqueryload


class SourceRegistry(object):
class ConnectorRegistry(object):
""" Central Registry for all available datasource engines"""

sources = {}
Expand All @@ -26,15 +26,15 @@ def get_datasource(cls, datasource_type, datasource_id, session):
@classmethod
def get_all_datasources(cls, session):
datasources = []
for source_type in SourceRegistry.sources:
for source_type in ConnectorRegistry.sources:
datasources.extend(
session.query(SourceRegistry.sources[source_type]).all())
session.query(ConnectorRegistry.sources[source_type]).all())
return datasources

@classmethod
def get_datasource_by_name(cls, session, datasource_type, datasource_name,
schema, database_name):
datasource_class = SourceRegistry.sources[datasource_type]
datasource_class = ConnectorRegistry.sources[datasource_type]
datasources = session.query(datasource_class).all()

# Filter datasoures that don't have database.
Expand All @@ -45,7 +45,7 @@ def get_datasource_by_name(cls, session, datasource_type, datasource_name,

@classmethod
def query_datasources_by_permissions(cls, session, database, permissions):
datasource_class = SourceRegistry.sources[database.type]
datasource_class = ConnectorRegistry.sources[database.type]
return (
session.query(datasource_class)
.filter_by(database_id=database.id)
Expand All @@ -56,7 +56,7 @@ def query_datasources_by_permissions(cls, session, database, permissions):
@classmethod
def get_eager_datasource(cls, session, datasource_type, datasource_id):
"""Returns datasource with columns and metrics."""
datasource_class = SourceRegistry.sources[datasource_type]
datasource_class = ConnectorRegistry.sources[datasource_type]
return (
session.query(datasource_class)
.options(
Expand Down
2 changes: 2 additions & 0 deletions superset/connectors/druid/__init__.py
@@ -0,0 +1,2 @@
from . import models # noqa
from . import views # noqa

0 comments on commit 2969cc9

Please sign in to comment.