Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
./pep-it.sh changes
Browse files Browse the repository at this point in the history
  • Loading branch information
richardTowers committed Aug 29, 2018
1 parent 348c48d commit d180e1d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 24 deletions.
33 changes: 22 additions & 11 deletions backdrop/core/storage/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,23 @@ def data_set_exists(self, data_set_id):
# in that it will return False if `create_data_set` has
# been called, but no records have been saved.
with self.connection.cursor() as cursor:
cursor.execute(create_data_set_exists_query(cursor.mogrify, data_set_id))
cursor.execute(
create_data_set_exists_query(cursor.mogrify, data_set_id))
return cursor.rowcount > 0

def create_data_set(self, data_set_id, size):
pass

def delete_data_set(self, data_set_id):
with self.connection.cursor() as cursor:
cursor.execute(create_delete_data_set_query(cursor.mogrify, data_set_id))
cursor.execute(
create_delete_data_set_query(cursor.mogrify, data_set_id))
self.connection.commit()

def get_last_updated(self, data_set_id):
with self.connection.cursor() as cursor:
cursor.execute(create_get_last_updated_query(cursor.mogrify, data_set_id))
cursor.execute(
create_get_last_updated_query(cursor.mogrify, data_set_id))

if cursor.rowcount == 0:
return None
Expand All @@ -66,11 +69,14 @@ def get_last_updated(self, data_set_id):
def batch_last_updated(self, data_sets):
collections = [collection.name for collection in data_sets]
with self.connection.cursor() as cursor:
cursor.execute(create_batch_last_updated_query(cursor.mogrify, collections))
cursor.execute(
create_batch_last_updated_query(cursor.mogrify, collections))
results = cursor.fetchall()
timestamp_by_collection = {collection: max_timestamp for [collection, max_timestamp] in results}
timestamp_by_collection = {
collection: max_timestamp for [collection, max_timestamp] in results}
for data_set in data_sets:
data_set._last_updated = timestamp_by_collection[data_set.name].replace(tzinfo=pytz.UTC)
data_set._last_updated = timestamp_by_collection[
data_set.name].replace(tzinfo=pytz.UTC)

def empty_data_set(self, data_set_id):
self.delete_data_set(data_set_id)
Expand All @@ -80,7 +86,8 @@ def save_record(self, data_set_id, record):

def find_record(self, data_set_id, record_id):
with self.connection.cursor() as cursor:
cursor.execute(create_find_record_query(cursor.mogrify, data_set_id, record_id))
cursor.execute(
create_find_record_query(cursor.mogrify, data_set_id, record_id))
(record,) = cursor.fetchone()
return _parse_datetime_fields(record)

Expand All @@ -90,17 +97,20 @@ def update_record(self, data_set_id, record_id, record):
ts = record['_timestamp'] if '_timestamp' in record else updated_at

with self.connection.cursor() as cursor:
cursor.execute(create_update_record_query(cursor.mogrify, data_set_id, record, record_id, ts, updated_at))
cursor.execute(create_update_record_query(
cursor.mogrify, data_set_id, record, record_id, ts, updated_at))
self.connection.commit()

def delete_record(self, data_set_id, record_id):
with self.connection.cursor() as cursor:
cursor.execute(create_delete_record_query(cursor.mogrify, data_set_id, record_id))
cursor.execute(
create_delete_record_query(cursor.mogrify, data_set_id, record_id))
self.connection.commit()

def execute_query(self, data_set_id, query):
with self.connection.cursor() as cursor:
pg_query, convert_query_result_to_dictionaries = create_sql_query(cursor.mogrify, data_set_id, query)
pg_query, convert_query_result_to_dictionaries = create_sql_query(
cursor.mogrify, data_set_id, query)
cursor.execute(pg_query)
records = convert_query_result_to_dictionaries(cursor.fetchall())
return [_parse_datetime_fields(record) for record in records]
Expand All @@ -116,7 +126,8 @@ def _parse_datetime_fields(obj):
obj_copy = obj.copy()
for field in ['_updated_at', '_timestamp']:
if field in obj:
obj_copy[field] = dateutil.parser.parse(obj[field]).replace(tzinfo=pytz.UTC)
obj_copy[field] = dateutil.parser.parse(
obj[field]).replace(tzinfo=pytz.UTC)
for key, value in obj.iteritems():
if isinstance(value, datetime):
obj_copy[key] = value.replace(tzinfo=pytz.UTC)
Expand Down
17 changes: 12 additions & 5 deletions backdrop/core/storage/sql_query_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def create_batch_last_updated_query(mogrify, collections):
collections
)


def create_sql_query(mogrify, data_set_id, user_query):
"""
Creates a sql query and a funtion which transforms the output into a list
Expand Down Expand Up @@ -170,7 +171,8 @@ def _create_grouped_sql_query(mogrify, data_set_id, user_query):
field_group_by_column_name = _get_field_group_columns(mogrify, user_query)
collect_column_by_column_name = _get_collect_columns(mogrify, user_query)

group_columns = period_group_by_column_name.values() + field_group_by_column_name.values()
group_columns = period_group_by_column_name.values(
) + field_group_by_column_name.values()

columns = (
['count(*)'] +
Expand All @@ -181,7 +183,8 @@ def _create_grouped_sql_query(mogrify, data_set_id, user_query):
[mogrify('collection=%(collection)s', {'collection': data_set_id})] +
_get_where_conditions(mogrify, user_query) +
_get_time_limit_conditions(mogrify, user_query) +
_get_field_group_not_null_conditions(field_group_by_column_name.values())
_get_field_group_not_null_conditions(
field_group_by_column_name.values())
)
query_tokens = [
'SELECT',
Expand All @@ -196,7 +199,9 @@ def _create_grouped_sql_query(mogrify, data_set_id, user_query):
sql_query = ' '.join([token for token in query_tokens if token])

def translate_results(rows):
key_by_index = ['_count'] + period_group_by_column_name.keys() + field_group_by_column_name.keys() + collect_column_by_column_name.keys()
key_by_index = ['_count'] + period_group_by_column_name.keys() + \
field_group_by_column_name.keys() + \
collect_column_by_column_name.keys()
return [_translate_row(row, key_by_index) for row in rows]

return sql_query, translate_results
Expand Down Expand Up @@ -227,14 +232,16 @@ def _get_field_group_not_null_conditions(field_group_columns):
def _get_period_columns(mogrify, user_query):
if user_query.period:
key = '_{}_start_at'.format(user_query.period.name)
# There will only ever be one period, so a normal (unordered) dict is fine here
# There will only ever be one period, so a normal (unordered) dict is
# fine here
return {key: mogrify("date_trunc(%(period)s, timestamp)", {'period': user_query.period.name})}
return {}


def _get_collect_columns(mogrify, user_query):
return OrderedDict(
(collect, mogrify("array_agg(record->%(collect)s)", {'collect': collect}))
(collect, mogrify(
"array_agg(record->%(collect)s)", {'collect': collect}))
for collect, _ in (user_query.collect or [])
)

Expand Down
3 changes: 2 additions & 1 deletion backdrop/core/storage/storage_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def create_storage_engine(config):
elif database_engine is 'postgres':
storage = PostgresStorageEngine(database_url)
else:
raise NotImplementedError('Database engine not implemented "%s"' % database_engine)
raise NotImplementedError(
'Database engine not implemented "%s"' % database_engine)

return storage
13 changes: 6 additions & 7 deletions tests/core/storage/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,15 @@ def test_batch_last_updated(self):
records = {
# timestamps in ascending order
'some_data': [
{'_id':'test_id_42', '_timestamp': d_tz(2018, 1, 1)},
{'_id':'test_id_43', '_timestamp': d_tz(2019, 1, 1)},
{'_id':'test_id_44', '_timestamp': d_tz(2020, 1, 1)},
{'_id': 'test_id_42', '_timestamp': d_tz(2018, 1, 1)},
{'_id': 'test_id_43', '_timestamp': d_tz(2019, 1, 1)},
{'_id': 'test_id_44', '_timestamp': d_tz(2020, 1, 1)},
],
# timestamps in descending order
'some_other_data': [
{'_id':'test_id_45', '_timestamp': d_tz(2017, 1, 1)},
{'_id':'test_id_46', '_timestamp': d_tz(2016, 1, 1)},
{'_id':'test_id_47', '_timestamp': d_tz(2015, 1, 1)},
{'_id': 'test_id_45', '_timestamp': d_tz(2017, 1, 1)},
{'_id': 'test_id_46', '_timestamp': d_tz(2016, 1, 1)},
{'_id': 'test_id_47', '_timestamp': d_tz(2015, 1, 1)},
]
}

Expand All @@ -358,4 +358,3 @@ def test_batch_last_updated(self):

assert_that(some_data_set_last_updated, is_(d_tz(2020, 1, 1, 0, 0, 0)))
assert_that(some_other_data_set_last_updated, is_(d_tz(2017, 1, 1, 0, 0, 0)))

0 comments on commit d180e1d

Please sign in to comment.