Skip to content

Commit

Permalink
Refactor the query runner to enable async mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdan Kyryliuk committed Aug 16, 2016
1 parent 5688246 commit e1e3382
Show file tree
Hide file tree
Showing 11 changed files with 578 additions and 292 deletions.
4 changes: 4 additions & 0 deletions bogdan.todo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1. [] implement the polling of the query results
2. [] implement the retrieving of the CTA results
3. [] implement parsing of the query to retrieve the table names

16 changes: 0 additions & 16 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,7 @@

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ""
# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = ('caravel.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None

try:
from caravel_config import * # noqa
Expand All @@ -203,4 +188,3 @@ class CeleryConfig(object):

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

60 changes: 60 additions & 0 deletions caravel/extract_table_names.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Andi Albrecht, albrecht.andi@gmail.com
#
# This example is part of python-sqlparse and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
#
# This example illustrates how to extract table names from nested
# SELECT statements.
#
# See:
# http://groups.google.com/group/sqlparse/browse_thread/thread/b0bd9a022e9d4895

import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML


def is_subselect(parsed):
if not parsed.is_group():
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() == 'SELECT':
return True
return False


def extract_from_part(parsed):
from_seen = False
for item in parsed.tokens:
if from_seen:
if is_subselect(item):
for x in extract_from_part(item):
yield x
elif item.ttype is Keyword:
raise StopIteration
else:
yield item
elif item.ttype is Keyword and item.value.upper() == 'FROM':
from_seen = True


def extract_table_identifiers(token_stream):
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
yield identifier.get_name()
elif isinstance(item, Identifier):
yield item.get_name()
# It's a bug to check for Keyword here, but in the example
# above some tables names are identified as keywords...
elif item.ttype is Keyword:
yield item.value


# TODO(bkyryliuk): add logic to support joins and unions.
def extract_tables(sql):
stream = extract_from_part(sqlparse.parse(sql)[0])
return list(extract_table_identifiers(stream))
15 changes: 12 additions & 3 deletions caravel/migrations/versions/ad82a75afd82_add_query_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,26 @@
from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table('query',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('database_id', sa.Integer(), nullable=False),
sa.Column('tmp_table_name', sa.String(length=64), nullable=True),
sa.Column('tmp_table_name', sa.String(length=256), nullable=True),
sa.Column('tab_name', sa.String(length=256),nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=16), nullable=True),
sa.Column('name', sa.String(length=64), nullable=True),
sa.Column('sql', sa.Text, nullable=True),
sa.Column('name', sa.String(length=256), nullable=True),
sa.Column('schema', sa.String(length=256), nullable=True),
sa.Column('sql', sa.Text(), nullable=True),
sa.Column('select_sql', sa.Text(), nullable=True),
sa.Column('executed_sql', sa.Text(), nullable=True),
sa.Column('limit', sa.Integer(), nullable=True),
sa.Column('limit_used', sa.Boolean(), nullable=True),
sa.Column('select_as_cta', sa.Boolean(), nullable=True),
sa.Column('select_as_cta_used', sa.Boolean(), nullable=True),
sa.Column('progress', sa.Integer(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('start_time', sa.DateTime(), nullable=True),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['database_id'], [u'dbs.id'], ),
Expand Down
28 changes: 24 additions & 4 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=True)
select_as_create_table_as = Column(Boolean, default=False)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down Expand Up @@ -1711,6 +1711,16 @@ class FavStar(Model):


class QueryStatus:
def from_presto_states(self, presto_status):
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS

SCHEDULED = 'SCHEDULED'
CANCELLED = 'CANCELLED'
IN_PROGRESS = 'IN_PROGRESS'
Expand All @@ -1729,18 +1739,28 @@ class Query(Model):
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)

# Store the tmp table into the DB only if the user asks for it.
tmp_table_name = Column(String(64))
tmp_table_name = Column(String(256))
user_id = Column(Integer, ForeignKey('ab_user.id'), nullable=True)

# models.QueryStatus
status = Column(String(16))

name = Column(String(64))
name = Column(String(256))
tab_name = Column(String(256))
schema = Column(String(256))
sql = Column(Text)
# Could be configured in the caravel config
# Query to retrieve the results,
# used only in case of select_as_cta_used is true.
select_sql = Column(Text)
executed_sql = Column(Text)
# Could be configured in the caravel config.
limit = Column(Integer)
limit_used = Column(Boolean)
select_as_cta = Column(Boolean)
select_as_cta_used = Column(Boolean)

# 1..100
progress = Column(Integer)
error_message = Column(Text)
start_time = Column(DateTime)
end_time = Column(DateTime)

0 comments on commit e1e3382

Please sign in to comment.