Skip to content

Commit

Permalink
Preliminary commit for Celery backend
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jun 14, 2016
1 parent 267c019 commit 9eae9d1
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 36 deletions.
6 changes: 6 additions & 0 deletions caravel/bin/caravel
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,11 @@ def refresh_druid():
session.commit()


@manager.command
def worker():
"""Starts a Caravel worker for async query load"""
raise NotImplementedError("# TODO! @b.kyryliuk")


if __name__ == "__main__":
manager.run()
18 changes: 18 additions & 0 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,23 @@
INTERVAL = 1
BACKUP_COUNT = 30

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
## Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_IMPORTS = ('myapp.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
"""
CELERY_CONFIG = None

# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

try:
from caravel_config import * # noqa
Expand All @@ -181,3 +198,4 @@

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

23 changes: 23 additions & 0 deletions caravel/migrations/versions/33459b145c15_allow_temp_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""allow_temp_table
Revision ID: 33459b145c15
Revises: d8bc074f7aad
Create Date: 2016-06-13 15:54:08.117103
"""

# revision identifiers, used by Alembic.
revision = '33459b145c15'
down_revision = 'd8bc074f7aad'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column(
'dbs', sa.Column('allow_temp_table', sa.Boolean(), nullable=True))


def downgrade():
op.drop_column('dbs', 'allow_temp_table')
1 change: 1 addition & 0 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
allow_temp_table = Column(Boolean, default=False)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down
93 changes: 57 additions & 36 deletions caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
from __future__ import print_function
from __future__ import unicode_literals

from datetime import datetime
import json
import logging
import re
import sys
import time
import traceback
from datetime import datetime

import pandas as pd
import sqlalchemy as sqla

import celery
from flask import (
g, request, redirect, flash, Response, render_template, Markup)
from flask_appbuilder import ModelView, CompactCRUDMixin, BaseView, expose
Expand All @@ -36,6 +37,47 @@
config = app.config
log_this = models.Log.log_this

celery_app = celery.Celery(celery_config=config.get('CELERY_CONFIG'))

@celery_app.task
def get_sql_results(database_id, sql, async=False):
"""Gets sql results from a Caravel database connection"""
# TODO @b.kyryliuk handle async
# handle models.Queries (userid, sql, timestamps, status) index on userid, state, start_ddtm
session = db.session()
mydb = session.query(models.Database).filter_by(id=database_id).first()

if (
not self.appbuilder.sm.has_access(
'all_datasource_access', 'all_datasource_access')):
raise utils.CaravelSecurityException(_(
"This view requires the `all_datasource_access` permission"))
content = ""
if mydb:
eng = mydb.get_sqla_engine()
if config.SQL_MAX_ROW:
sql = sql.strip().strip(';')
qry = (
select('*')
.select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry'))
.limit(config.SQL_MAX_ROW)
)
sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True}))
try:
df = pd.read_sql_query(sql=sql, con=eng)
content = df.to_html(
index=False,
na_rep='',
classes=(
"dataframe table table-striped table-bordered "
"table-condensed sql_results").split(' '))
except Exception as e:
content = (
'<div class="alert alert-danger">'
"{}</div>"
).format(e.message)
session.commit()


def check_ownership(obj, raise_if_false=True):
"""Meant to be used in `pre_update` hooks on models to enforce ownership
Expand Down Expand Up @@ -285,7 +327,8 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Database)
list_columns = ['database_name', 'sql_link', 'creator', 'changed_on_']
add_columns = [
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra']
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra',
'allow_temp_table']
search_exclude_columns = ('password',)
edit_columns = add_columns
add_template = "caravel/models/database/add.html"
Expand All @@ -305,6 +348,10 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
"gets unpacked into the [sqlalchemy.MetaData]"
"(http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html"
"#sqlalchemy.schema.MetaData) call. ", True),
'allow_temp_table': (
"Whether Caravel can run async queries by and attempt to "
"store results in temporary tables"
),
}
label_columns = {
'database_name': _("Database"),
Expand Down Expand Up @@ -1018,45 +1065,19 @@ def select_star(self, database_id, table_name):
@log_this
def runsql(self):
"""Runs arbitrary sql and returns and html table"""
session = db.session()
limit = 1000
data = json.loads(request.form.get('data'))
sql = data.get('sql')
database_id = data.get('database_id')
mydb = session.query(models.Database).filter_by(id=database_id).first()

if (
not self.appbuilder.sm.has_access(
'all_datasource_access', 'all_datasource_access')):
raise utils.CaravelSecurityException(_(
"This view requires the `all_datasource_access` permission"))
content = ""
if mydb:
eng = mydb.get_sqla_engine()
if limit:
sql = sql.strip().strip(';')
qry = (
select('*')
.select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry'))
.limit(limit)
)
sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True}))
try:
df = pd.read_sql_query(sql=sql, con=eng)
content = df.to_html(
index=False,
na_rep='',
classes=(
"dataframe table table-striped table-bordered "
"table-condensed sql_results").split(' '))
except Exception as e:
content = (
'<div class="alert alert-danger">'
"{}</div>"
).format(e.message)
session.commit()
# TODO @b.kyryliuk handle async
content = get_sql_results(database_id, sql)
# get_sql_results.async(database_id, sql)
return content

@expose("/async_sql_status/")
def async_sql_status(self, userid):
#TODO @b.kyryliuk
return

@has_access
@expose("/refresh_datasources/")
def refresh_datasources(self):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
install_requires=[
'alembic>=0.8.5, <0.9.0',
'babel==2.3.4',
'celery==3.1.23',
'cryptography>=1.1.1, <2.0.0',
'flask-appbuilder>=1.7.1, <2.0.0',
'Flask-BabelPkg==0.9.6',
Expand Down

0 comments on commit 9eae9d1

Please sign in to comment.