Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Implement grouping by period
Browse files Browse the repository at this point in the history
  • Loading branch information
richardTowers committed Aug 23, 2018
1 parent db5145a commit 6285624
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
49 changes: 44 additions & 5 deletions backdrop/core/storage/postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import psycopg2
import psycopg2.extras
import json
from datetime import date, datetime
from .. import timeutils
Expand Down Expand Up @@ -51,7 +52,7 @@ def create_table_and_indices(self):
collection VARCHAR NOT NULL,
timestamp TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
record JSON NOT NULL
record JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS mongo_collection ON mongo (collection);
CREATE INDEX IF NOT EXISTS mongo_timestamp ON mongo (timestamp);
Expand Down Expand Up @@ -164,12 +165,50 @@ def delete_record(self, data_set_id, record_id):
self.connection.commit()

def execute_query(self, data_set_id, query):
with self.connection.cursor() as psql_cursor:
psql_cursor.execute(self._get_postgres_query(data_set_id, query))
records = psql_cursor.fetchall()
return [parse_datetime_fields(record) for (record,) in records]
if query.is_grouped:
with self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as psql_cursor:
pg_query = self._get_grouped_postgres_query(data_set_id, query)
psql_cursor.execute(pg_query)
records = psql_cursor.fetchall()
for record in records:
for key, value in record.iteritems():
if isinstance(value, datetime):
record[key] = value.replace(tzinfo=pytz.UTC)
return records
else:
with self.connection.cursor() as psql_cursor:
pg_query = self._get_basic_postgres_query(data_set_id, query)
psql_cursor.execute(pg_query)
records = psql_cursor.fetchall()
return [parse_datetime_fields(record) for (record,) in records]

def _get_postgres_query(self, data_set_id, query):
if query.is_grouped:
return self._get_grouped_postgres_query(data_set_id, query)
else:
return self._get_basic_postgres_query(data_set_id, query)

def _get_grouped_postgres_query(self, data_set_id, query):
with self.connection.cursor() as psql_cursor:
return " ".join([line for line in [
"SELECT",
", ".join([elem for elem in [
"count(*) as _count",
self._get_period_group(psql_cursor, query)
] if elem]),
"FROM mongo",
# query.period.name is not user input, so no need to mogrify this
"GROUP BY _%s_start_at" % query.period.name if query.period else ""
] if line])

def _get_period_group(self, psql_cursor, query):
if query.period:
# query.period.name is not user input, so no need to mogrify this
return "date_trunc('{period}', timestamp) as _{period}_start_at".format(period = query.period.name)
return ''


def _get_basic_postgres_query(self, data_set_id, query):
with self.connection.cursor() as psql_cursor:
where_conditions = self._get_where_conditions(query)
time_limit_conditions = self._get_time_limit_conditions(query)
Expand Down
23 changes: 21 additions & 2 deletions tests/core/storage/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
from backdrop.core.storage.postgres import PostgresStorageEngine
from .test_storage import BaseStorageTest
from backdrop.core.query import Query
from backdrop.core.timeseries import DAY, WEEK

def setup_module():
PostgresStorageEngine('postgres://localhost:5432/backdrop').create_table_and_indices()
PostgresStorageEngine('postgres://postgres:mysecretpassword@localhost:5432').create_table_and_indices()

class TestPostgresStorageEngine(BaseStorageTest):
def setup(self):
self.engine = PostgresStorageEngine('postgres://postgres:mysecretpassword@localhost:5432')

def test_get_postgres_query(self):
def test_get_basic_postgres_query(self):
result = self.engine._get_postgres_query(
'some-collection',
Query.create(filter_by = [('foo', 'bar')])
Expand All @@ -21,6 +22,24 @@ def test_get_postgres_query(self):
is_("SELECT record FROM mongo WHERE collection='some-collection' AND record ->> 'foo' = 'bar' LIMIT ALL")
)

def test_get_group_by_postgres_query(self):
result = self.engine._get_postgres_query(
'some-collection',
Query.create(period=DAY)
)
assert_that(
result,
is_("SELECT count(*) as _count, date_trunc('day', timestamp) as _day_start_at FROM mongo GROUP BY _day_start_at")
)
result = self.engine._get_postgres_query(
'some-collection',
Query.create(period=WEEK)
)
assert_that(
result,
is_("SELECT count(*) as _count, date_trunc('week', timestamp) as _week_start_at FROM mongo GROUP BY _week_start_at")
)

@unittest.skip('The postgres datastore does not support the creation of empty datasets')
def test_create(self):
pass
Expand Down

0 comments on commit 6285624

Please sign in to comment.