Skip to content

Commit

Permalink
Merge pull request #514 from alexanderlz/master
Browse files Browse the repository at this point in the history
Feature: Support Hive as datasource
  • Loading branch information
arikfr committed Jul 30, 2015
2 parents 41ca132 + c140668 commit fe1cc78
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -11,7 +11,7 @@ Prior to **_re:dash_**, we tried to use traditional BI suites and discovered a s

**_re:dash_** was built to allow fast and easy access to billions of records, that we process and collect using Amazon Redshift ("petabyte scale data warehouse" that "speaks" PostgreSQL).
Today **_re:dash_** has support for querying multiple databases, including: Redshift, Google BigQuery, PostgreSQL, MySQL, Graphite,
Presto, Google Spreadsheets, Cloudera Impala and custom scripts.
Presto, Google Spreadsheets, Cloudera Impala, Hive and custom scripts.

**_re:dash_** consists of two parts:

Expand Down
7 changes: 7 additions & 0 deletions redash/query_runner/__init__.py
Expand Up @@ -70,6 +70,13 @@ def run_query(self, query):
def get_schema(self):
return []

def _run_query_internal(self, query):
results, error = self.run_query(query)

if error is not None:
raise Exception("Failed running query [%s]." % query)
return json.loads(results)['rows']

@classmethod
def to_dict(cls):
return {
Expand Down
135 changes: 135 additions & 0 deletions redash/query_runner/hive_ds.py
@@ -0,0 +1,135 @@
import json
import logging
import sys

from redash.query_runner import *
from redash.utils import JSONEncoder

logger = logging.getLogger(__name__)

try:
from pyhive import hive
enabled = True
except ImportError, e:
logger.exception(e)
logger.warning("Missing dependencies. Please install pyhive.")
logger.warning("You can use pip: pip install pyhive")
enabled = False

COLUMN_NAME = 0
COLUMN_TYPE = 1

types_map = {
'BIGINT': TYPE_INTEGER,
'TINYINT': TYPE_INTEGER,
'SMALLINT': TYPE_INTEGER,
'INT': TYPE_INTEGER,
'DOUBLE': TYPE_FLOAT,
'DECIMAL': TYPE_FLOAT,
'FLOAT': TYPE_FLOAT,
'REAL': TYPE_FLOAT,
'BOOLEAN': TYPE_BOOLEAN,
'TIMESTAMP': TYPE_DATETIME,
'DATE': TYPE_DATETIME,
'CHAR': TYPE_STRING,
'STRING': TYPE_STRING,
'VARCHAR': TYPE_STRING
}


class Hive(BaseQueryRunner):
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"host": {
"type": "string"
},
"port": {
"type": "number"
},
"database": {
"type": "string"
},
"username": {
"type": "string"
}
},
"required": ["host"]
}

@classmethod
def annotate_query(cls):
return False

@classmethod
def type(cls):
return "hive"

def __init__(self, configuration_json):
super(Hive, self).__init__(configuration_json)

def get_schema(self):
try:
schemas_query = "show schemas"

tables_query = "show tables in %s"

columns_query = "show columns in %s"

schema = {}
for schema_name in filter(lambda a: len(a) > 0, map(lambda a: str(a['database_name']), self._run_query_internal(schemas_query))):
for table_name in filter(lambda a: len(a) > 0, map(lambda a: str(a['tab_name']), self._run_query_internal(tables_query % schema_name))):
columns = filter(lambda a: len(a) > 0, map(lambda a: str(a['field']), self._run_query_internal(columns_query % table_name)))

if schema_name != 'default':
table_name = '{}.{}'.format(schema_name, table_name)

schema[table_name] = {'name': table_name, 'columns': columns}
except Exception, e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
return schema.values()

def run_query(self, query):

connection = None
try:
connection = hive.connect(**self.configuration)

cursor = connection.cursor()

cursor.execute(query)

column_names = []
columns = []

for column in cursor.description:
column_name = column[COLUMN_NAME]
column_names.append(column_name)

columns.append({
'name': column_name,
'friendly_name': column_name,
'type': types_map.get(column[COLUMN_TYPE], None)
})

rows = [dict(zip(column_names, row)) for row in cursor]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
logging.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error

register(Hive)
7 changes: 0 additions & 7 deletions redash/query_runner/impala_ds.py
Expand Up @@ -79,13 +79,6 @@ def type(cls):
def __init__(self, configuration_json):
super(Impala, self).__init__(configuration_json)

def _run_query_internal(self, query):
results, error = self.run_query(query)

if error is not None:
raise Exception("Failed getting schema.")
return json.loads(results)['rows']

def get_schema(self):
try:
schemas_query = "show schemas;"
Expand Down
1 change: 1 addition & 0 deletions redash/settings.py
Expand Up @@ -126,6 +126,7 @@ def all_settings():
'redash.query_runner.influx_db',
'redash.query_runner.elasticsearch',
'redash.query_runner.presto',
'redash.query_runner.hive_ds',
'redash.query_runner.impala_ds',
])))

Expand Down

0 comments on commit fe1cc78

Please sign in to comment.