Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async support for the queries in the SQL Lab. WIP. #946

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions bogdan.todo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1. [] implement the polling of the query results
2. [] implement the retrieving of the CTA results
3. [] implement parsing of the query to retrieve the table names

16 changes: 0 additions & 16 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,7 @@

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ""
# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = ('caravel.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None

try:
from caravel_config import * # noqa
Expand All @@ -203,4 +188,3 @@ class CeleryConfig(object):

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

60 changes: 60 additions & 0 deletions caravel/extract_table_names.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Andi Albrecht, albrecht.andi@gmail.com
#
# This example is part of python-sqlparse and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
#
# This example illustrates how to extract table names from nested
# SELECT statements.
#
# See:
# http://groups.google.com/group/sqlparse/browse_thread/thread/b0bd9a022e9d4895

import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML


def is_subselect(parsed):
if not parsed.is_group():
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() == 'SELECT':
return True
return False


def extract_from_part(parsed):
from_seen = False
for item in parsed.tokens:
if from_seen:
if is_subselect(item):
for x in extract_from_part(item):
yield x
elif item.ttype is Keyword:
raise StopIteration
else:
yield item
elif item.ttype is Keyword and item.value.upper() == 'FROM':
from_seen = True


def extract_table_identifiers(token_stream):
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
yield identifier.get_name()
elif isinstance(item, Identifier):
yield item.get_name()
# It's a bug to check for Keyword here, but in the example
# above some tables names are identified as keywords...
elif item.ttype is Keyword:
yield item.value


# TODO(bkyryliuk): add logic to support joins and unions.
def extract_tables(sql):
stream = extract_from_part(sqlparse.parse(sql)[0])
return list(extract_table_identifiers(stream))
15 changes: 12 additions & 3 deletions caravel/migrations/versions/ad82a75afd82_add_query_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,26 @@
from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table('query',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('database_id', sa.Integer(), nullable=False),
sa.Column('tmp_table_name', sa.String(length=64), nullable=True),
sa.Column('tmp_table_name', sa.String(length=256), nullable=True),
sa.Column('tab_name', sa.String(length=256),nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=16), nullable=True),
sa.Column('name', sa.String(length=64), nullable=True),
sa.Column('sql', sa.Text, nullable=True),
sa.Column('name', sa.String(length=256), nullable=True),
sa.Column('schema', sa.String(length=256), nullable=True),
sa.Column('sql', sa.Text(), nullable=True),
sa.Column('select_sql', sa.Text(), nullable=True),
sa.Column('executed_sql', sa.Text(), nullable=True),
sa.Column('limit', sa.Integer(), nullable=True),
sa.Column('limit_used', sa.Boolean(), nullable=True),
sa.Column('select_as_cta', sa.Boolean(), nullable=True),
sa.Column('select_as_cta_used', sa.Boolean(), nullable=True),
sa.Column('progress', sa.Integer(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('start_time', sa.DateTime(), nullable=True),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['database_id'], [u'dbs.id'], ),
Expand Down
28 changes: 24 additions & 4 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=True)
select_as_create_table_as = Column(Boolean, default=False)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down Expand Up @@ -1711,6 +1711,16 @@ class FavStar(Model):


class QueryStatus:
def from_presto_states(self, presto_status):
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS
if presto_status.lower() == 'running':
return QueryStatus.IN_PROGRESS

SCHEDULED = 'SCHEDULED'
CANCELLED = 'CANCELLED'
IN_PROGRESS = 'IN_PROGRESS'
Expand All @@ -1729,18 +1739,28 @@ class Query(Model):
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)

# Store the tmp table into the DB only if the user asks for it.
tmp_table_name = Column(String(64))
tmp_table_name = Column(String(256))
user_id = Column(Integer, ForeignKey('ab_user.id'), nullable=True)

# models.QueryStatus
status = Column(String(16))

name = Column(String(64))
name = Column(String(256))
tab_name = Column(String(256))
schema = Column(String(256))
sql = Column(Text)
# Could be configured in the caravel config
# Query to retrieve the results,
# used only in case of select_as_cta_used is true.
select_sql = Column(Text)
executed_sql = Column(Text)
# Could be configured in the caravel config.
limit = Column(Integer)
limit_used = Column(Boolean)
select_as_cta = Column(Boolean)
select_as_cta_used = Column(Boolean)

# 1..100
progress = Column(Integer)
error_message = Column(Text)
start_time = Column(DateTime)
end_time = Column(DateTime)