Skip to content

Commit

Permalink
Merge pull request #434 from erans/master
Browse files Browse the repository at this point in the history
MongoDB aggregation support + mongo documentation (as comments)
  • Loading branch information
arikfr committed May 17, 2015
2 parents 105971c + 27ecf5f commit 59b87ec
Showing 1 changed file with 117 additions and 28 deletions.
145 changes: 117 additions & 28 deletions redash/query_runner/mongodb.py
Expand Up @@ -12,6 +12,7 @@
try:
import pymongo
from bson.objectid import ObjectId
from bson.son import SON
enabled = True

except ImportError:
Expand All @@ -32,24 +33,66 @@

date_regex = re.compile("ISODate\(\"(.*)\"\)", re.IGNORECASE)


def _get_column_by_name(columns, column_name):
for c in columns:
if "name" in c and c["name"] == column_name:
return c

return None


def _convert_date(q, field_name):
m = date_regex.findall(q[field_name])
if len(m) > 0:
if q[field_name].find(":") == -1:
q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d")))
else:
q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d %H:%M")))


# Simple query example:
#
# {
# "collection" : "my_collection",
# "query" : {
# "date" : {
# "$gt" : "ISODate(\"2015-01-15 11:41\")",
# },
# "type" : 1
# },
# "fields" : {
# "_id" : 1,
# "name" : 2
# },
# "sort" : [
# {
# "name" : "date",
# "direction" : -1
# }
# ]
#
# }
#
#
# Aggregation
# ===========
# Uses a syntax similar to the one used in PyMongo, however to support the
# correct order of sorting, it uses a regular list for the "$sort" operation
# that converts into a SON (sorted dictionary) object before execution.
#
# Aggregation query example:
#
# {
# "collection" : "things",
# "aggregate" : [
# {
# "$unwind" : "$tags"
# },
# {
# "$group" : {
# "_id" : "$tags",
# "count" : { "$sum" : 1 }
# }
# },
# {
# "$sort" : [
# {
# "name" : "count",
# "direction" : -1
# },
# {
# "name" : "_id",
# "direction" : -1
# }
# ]
# }
# ]
# }
#
#
class MongoDB(BaseQueryRunner):
@classmethod
def configuration_schema(cls):
Expand Down Expand Up @@ -89,6 +132,22 @@ def __init__(self, configuration_json):

self.is_replica_set = True if "replicaSetName" in self.configuration and self.configuration["replicaSetName"] else False

def _get_column_by_name(self, columns, column_name):
for c in columns:
if "name" in c and c["name"] == column_name:
return c

return None


def _convert_date(self, q, field_name):
m = date_regex.findall(q[field_name])
if len(m) > 0:
if q[field_name].find(":") == -1:
q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d")))
else:
q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d %H:%M")))

def run_query(self, query):
if self.is_replica_set:
db_connection = pymongo.MongoReplicaSetClient(self.configuration["connectionString"], replicaSet=self.configuration["replicaSetName"])
Expand Down Expand Up @@ -119,13 +178,34 @@ def run_query(self, query):
for k in q:
if q[k] and type(q[k]) in [str, unicode]:
logging.debug(q[k])
_convert_date(q, k)
self._convert_date(q, k)
elif q[k] and type(q[k]) is dict:
for k2 in q[k]:
if type(q[k][k2]) in [str, unicode]:
_convert_date(q[k], k2)
self._convert_date(q[k], k2)

f = None

aggregate = None
if "aggregate" in query_data:
aggregate = query_data["aggregate"]
for step in aggregate:
if "$sort" in step:
sort_list = []
for sort_item in step["$sort"]:
sort_list.append((sort_item["name"], sort_item["direction"]))

step["$sort"] = SON(sort_list)

if aggregate:
pass
else:
s = None
if "sort" in query_data and query_data["sort"]:
s = []
for field in query_data["sort"]:
s.append((field["name"], field["direction"]))

if "fields" in query_data:
f = query_data["fields"]

Expand All @@ -141,17 +221,26 @@ def run_query(self, query):
error = None
json_data = None

if s:
cursor = db[collection].find(q, f).sort(s)
else:
cursor = db[collection].find(q, f)
cursor = None
if q or (not q and not aggregate):
if s:
cursor = db[collection].find(q, f).sort(s)
else:
cursor = db[collection].find(q, f)

if "skip" in query_data:
cursor = cursor.skip(query_data["skip"])

if "limit" in query_data:
cursor = cursor.limit(query_data["limit"])

if "limit" in query_data and query_data["limit"]:
cursor = cursor.limit(query_data["limit"])
elif aggregate:
r = db[collection].aggregate(aggregate)
cursor = r["result"]

for r in cursor:
for k in r:
if _get_column_by_name(columns, k) is None:
if self._get_column_by_name(columns, k) is None:
columns.append({
"name": k,
"friendly_name": k,
Expand All @@ -167,7 +256,7 @@ def run_query(self, query):
if f:
ordered_columns = []
for k in sorted(f, key=f.get):
ordered_columns.append(_get_column_by_name(columns, k))
ordered_columns.append(self._get_column_by_name(columns, k))

columns = ordered_columns

Expand Down

0 comments on commit 59b87ec

Please sign in to comment.