Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
wip: Implement PostgresStorageEngine
Browse files Browse the repository at this point in the history
Not passing the tests yet...
  • Loading branch information
richardTowers committed Aug 22, 2018
1 parent 79d7e6a commit 05804ce
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 60 deletions.
89 changes: 65 additions & 24 deletions backdrop/core/storage/postgres.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,93 @@
class PostgresStorageEngine(object):
import psycopg2
import json
from datetime import date, datetime
from .. import timeutils

def __init__(self):
pass
def json_serial(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))

def alive(self):
# TODO
pass
class PostgresStorageEngine(object):

def __init__(self, datatbase_url):
self.connection = psycopg2.connect(datatbase_url)

def data_set_exists(self, data_set_id):
# TODO
pass
# This is slightly different to the mongo implementation
# in that it will return False if `create_data_set` has
# been called, but no records have been saved.
with self.connection.cursor() as psql_cursor:
psql_cursor.execute("""
SELECT 1 FROM mongo
WHERE collection=%(collection)s
LIMIT 1
""",
{'collection': data_set_id}
)
return psql_cursor.fetchone() is not None

def create_data_set(self, data_set_id, size):
# TODO
pass

def delete_data_set(self, data_set_id):
# TODO
pass
with self.connection.cursor() as psql_cursor:
psql_cursor.execute(
"DELETE FROM mongo WHERE collection=%(collection)s",
{'collection': data_set_id}
)
self.connection.commit()

def get_last_updated(self, data_set_id):
# TODO
# TODO - this requires a bit of sorting on the JSON column
pass

def batch_last_updated(self, data_sets):
# TODO
# TODO - this requires quite a complex query
pass

def empty_data_set(self, data_set_id):
# TODO
pass
delete_data_set(data_set_id)

def save_record(self, data_set_id, record):
# TODO
pass
self.update_record(data_set_id, record['_id'], record)

def find_record(self, data_set_id, record_id):
# TODO
pass
with self.connection.cursor() as psql_cursor:
psql_cursor.execute("""
SELECT record FROM mongo
WHERE id=%(id)s
""",
{'id': record_id}
)
(record,) = psql_cursor.fetchone()
return record

def update_record(self, data_set_id, record_id, record):
# TODO
pass
record['_updated_at'] = timeutils.now()
record_id = data_set_id + ':' + record_id
with self.connection.cursor() as psql_cursor:
psql_cursor.execute("""
INSERT INTO mongo (id, collection, record)
VALUES (%(id)s, %(collection)s, %(record)s)
ON CONFLICT (id) DO UPDATE SET record=%(record)s""",
{
'id': record_id,
'collection': data_set_id,
'record': json.dumps(record, default=json_serial)
}

)
self.connection.commit()

def delete_record(self, data_set_id, record_id):
# TODO
pass
with self.connection.cursor() as psql_cursor:
psql_cursor.execute(
"DELETE FROM mongo WHERE id=%(id)s",
{'id': record_id}
)
self.connection.commit()

def execute_query(self, data_set_id, query):
# TODO
pass
return []
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ logstash_formatter==0.5.7
Werkzeug==0.12.2
redis==2.10.6
flower==0.9.2
psycopg2==2.7.5
10 changes: 10 additions & 0 deletions tests/core/storage/test_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from backdrop.core.storage.postgres import PostgresStorageEngine

from .test_storage import BaseStorageTest

class TestPostgresStorageEngine(BaseStorageTest):
def setup(self):
self.engine = PostgresStorageEngine('postgres://postgres:mysecretpassword@localhost:5432')

def teardown(self):
self.engine.delete_data_set('foo_bar')
72 changes: 36 additions & 36 deletions tests/core/storage/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_create_and_delete(self):
assert_that(self.engine.data_set_exists('foo_bar'), is_(False))

def test_simple_saving_and_finding(self):
self._save_all('foo_bar', {'foo': 'bar'})
self._save_all('foo_bar', {'_id':'test_id_1','foo': 'bar'})

assert_that(self.engine.execute_query('foo_bar', Query.create()),
contains(has_entries({'foo': 'bar'})))
Expand All @@ -73,9 +73,9 @@ def test_saving_a_record_adds_an_updated_at(self):
def test_get_last_updated(self):
self.engine.create_data_set('foo_bar', 0)
with freeze_time('2012-12-12'):
self.engine.save_record('foo_bar', {'foo': 'first'})
self.engine.save_record('foo_bar', {'_id':'test_id_2','foo': 'first'})
with freeze_time('2012-11-12'):
self.engine.save_record('foo_bar', {'foo': 'second'})
self.engine.save_record('foo_bar', {'_id':'test_id_3','foo': 'second'})

assert_that(self.engine.get_last_updated('foo_bar'),
is_(d_tz(2012, 12, 12)))
Expand All @@ -97,15 +97,15 @@ def test_capped_data_set_is_capped(self):
self.engine.create_data_set('foo_bar', 1)

for i in range(100):
self.engine.save_record('foo_bar', {'foo': i})
self.engine.save_record('foo_bar', {'_id':'test_id_4','foo': i})

assert_that(
len(self.engine.execute_query('foo_bar', Query.create())),
less_than(70))

def test_empty_a_data_set(self):
self._save_all('foo_bar',
{'foo': 'bar'}, {'bar': 'foo'})
{'_id':'test_id_5','foo': 'bar'}, {'_id':'test_id_6','bar': 'foo'})

assert_that(len(self.engine.execute_query('foo_bar', Query.create())), is_(2))

Expand Down Expand Up @@ -136,7 +136,7 @@ def test_delete_record(self):

def test_datetimes_are_returned_as_utc(self):
self._save_all('foo_bar',
{'_timestamp': datetime.datetime(2012, 8, 8)})
{'_id':'test_id_7','_timestamp': datetime.datetime(2012, 8, 8)})

results = self.engine.execute_query('foo_bar', Query.create())

Expand All @@ -145,7 +145,7 @@ def test_datetimes_are_returned_as_utc(self):
has_entries({'_timestamp': d_tz(2012, 8, 8)})))

def test_query_with_filter(self):
self._save_all('foo_bar', {'foo': 'bar'}, {'foo': 'foo'})
self._save_all('foo_bar', {'_id':'test_id_8','foo': 'bar'}, {'_id':'test_id_9','foo': 'foo'})

results = self.engine.execute_query('foo_bar', Query.create(
filter_by=[('foo', 'bar')]))
Expand All @@ -156,9 +156,9 @@ def test_query_with_filter(self):

def test_basic_query_with_time_limits(self):
self._save_all('foo_bar',
{'_timestamp': d_tz(2012, 12, 12)},
{'_timestamp': d_tz(2012, 12, 14)},
{'_timestamp': d_tz(2012, 12, 11)})
{'_id':'test_id_10','_timestamp': d_tz(2012, 12, 12)},
{'_id':'test_id_11','_timestamp': d_tz(2012, 12, 14)},
{'_id':'test_id_12','_timestamp': d_tz(2012, 12, 11)})

# start at
results = self.engine.execute_query('foo_bar', Query.create(
Expand Down Expand Up @@ -187,8 +187,8 @@ def test_basic_query_with_time_limits(self):

def test_basic_query_with_sort_ascending(self):
self._save_all('foo_bar',
{'foo': 'mug'},
{'foo': 'book'})
{'_id':'test_id_13','foo': 'mug'},
{'_id':'test_id_14','foo': 'book'})

results = self.engine.execute_query('foo_bar', Query.create(
sort_by=('foo', 'ascending')))
Expand All @@ -200,8 +200,8 @@ def test_basic_query_with_sort_ascending(self):

def test_basic_query_with_sort_descending(self):
self._save_all('foo_bar',
{'foo': 'mug'},
{'foo': 'book'})
{'_id':'test_id_15','foo': 'mug'},
{'_id':'test_id_16','foo': 'book'})

results = self.engine.execute_query('foo_bar', Query.create(
sort_by=('foo', 'descending')))
Expand All @@ -212,7 +212,7 @@ def test_basic_query_with_sort_descending(self):
has_entry('foo', 'book')))

def test_basic_query_with_limit(self):
self._save_all('foo_bar', {'foo': 'bar'}, {'foo': 'foo'})
self._save_all('foo_bar', {'_id':'test_id_17','foo': 'bar'}, {'_id':'test_id_18','foo': 'foo'})

results = self.engine.execute_query('foo_bar', Query.create(limit=1))

Expand All @@ -221,8 +221,8 @@ def test_basic_query_with_limit(self):
# !GROUPED!
def test_query_grouped_by_field(self):
self._save_all('foo_bar',
{'foo': 'foo'}, {'foo': 'foo'},
{'foo': 'bar'})
{'_id':'test_id_19','foo': 'foo'}, {'_id':'test_id_20','foo': 'foo'},
{'_id':'test_id_21','foo': 'bar'})

results = self.engine.execute_query('foo_bar', Query.create(
group_by=['foo']))
Expand All @@ -235,9 +235,9 @@ def test_query_grouped_by_field(self):
def test_query_grouped_by_period(self):
self._save_all_with_periods(
'foo_bar',
{'_timestamp': d_tz(2012, 12, 12, 12)},
{'_timestamp': d_tz(2012, 12, 12, 15)},
{'_timestamp': d_tz(2012, 12, 13, 12)})
{'_id':'test_id_22','_timestamp': d_tz(2012, 12, 12, 12)},
{'_id':'test_id_23','_timestamp': d_tz(2012, 12, 12, 15)},
{'_id':'test_id_24','_timestamp': d_tz(2012, 12, 13, 12)})

results = self.engine.execute_query('foo_bar', Query.create(
period=DAY))
Expand All @@ -252,10 +252,10 @@ def test_query_grouped_by_period(self):
def test_group_by_field_and_period(self):
self._save_all_with_periods(
'foo_bar',
{'_timestamp': d_tz(2012, 12, 12), 'foo': 'foo'},
{'_timestamp': d_tz(2012, 12, 13), 'foo': 'foo'},
{'_timestamp': d_tz(2012, 12, 12), 'foo': 'bar'},
{'_timestamp': d_tz(2012, 12, 12), 'foo': 'bar'})
{'_id':'test_id_25','_timestamp': d_tz(2012, 12, 12), 'foo': 'foo'},
{'_id':'test_id_26','_timestamp': d_tz(2012, 12, 13), 'foo': 'foo'},
{'_id':'test_id_27','_timestamp': d_tz(2012, 12, 12), 'foo': 'bar'},
{'_id':'test_id_28','_timestamp': d_tz(2012, 12, 12), 'foo': 'bar'})

results = self.engine.execute_query('foo_bar', Query.create(
group_by=['foo'], period=DAY))
Expand All @@ -268,8 +268,8 @@ def test_group_by_field_and_period(self):

def test_group_query_with_collect_fields(self):
self._save_all('foo_bar',
{'foo': 'foo', 'c': 1}, {'foo': 'foo', 'c': 3},
{'foo': 'bar', 'c': 2})
{'_id':'test_id_29','foo': 'foo', 'c': 1}, {'_id':'test_id_30','foo': 'foo', 'c': 3},
{'_id':'test_id_31','foo': 'bar', 'c': 2})

results = self.engine.execute_query('foo_bar', Query.create(
group_by=['foo'], collect=[('c', 'sum')]))
Expand All @@ -281,10 +281,10 @@ def test_group_query_with_collect_fields(self):

def test_group_and_collect_with_false_values(self):
self._save_all('foo_bar',
{'foo': 'one', 'bar': False},
{'foo': 'two', 'bar': True},
{'foo': 'two', 'bar': True},
{'foo': 'one', 'bar': False})
{'_id':'test_id_32','foo': 'one', 'bar': False},
{'_id':'test_id_33','foo': 'two', 'bar': True},
{'_id':'test_id_34','foo': 'two', 'bar': True},
{'_id':'test_id_35','foo': 'one', 'bar': False})

results = self.engine.execute_query('foo_bar', Query.create(
group_by=['foo'], collect=[('bar', 'sum')]))
Expand All @@ -296,9 +296,9 @@ def test_group_and_collect_with_false_values(self):

def test_group_query_ignores_records_without_grouping_key(self):
self._save_all('foo_bar',
{'foo': 'one'},
{'foo': 'two'},
{'bar': 'one'})
{'_id':'test_id_36','foo': 'one'},
{'_id':'test_id_37','foo': 'two'},
{'_id':'test_id_38','bar': 'one'})

results = self.engine.execute_query('foo_bar', Query.create(
group_by=['foo']))
Expand All @@ -310,9 +310,9 @@ def test_group_query_ignores_records_without_grouping_key(self):

def test_basic_query_with_inclusive_time_limits(self):
self._save_all('foo_bar',
{'_timestamp': d_tz(2014, 12, 01)},
{'_timestamp': d_tz(2014, 12, 02)},
{'_timestamp': d_tz(2014, 12, 03)})
{'_id':'test_id_39','_timestamp': d_tz(2014, 12, 01)},
{'_id':'test_id_40','_timestamp': d_tz(2014, 12, 02)},
{'_id':'test_id_41','_timestamp': d_tz(2014, 12, 03)})

# start at
results = self.engine.execute_query('foo_bar', Query.create(
Expand Down

0 comments on commit 05804ce

Please sign in to comment.