Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Refactoring and ensure we do no overwrite newer data
Browse files Browse the repository at this point in the history
Move is_latest_data out to util, use in tx transform

As we check the output rather than source data set for newer data we
will never overwrite never data. This is more reliable for things with multiple potential
sources like digital takeup.

We currently test is_latest_data through the transforms using it. Arguably the stuff around calls need to be moved to
test_utils so we are not repeating tests. It is worth keeping some component integration tests for sanity though.
  • Loading branch information
jcbashdown committed Jan 23, 2015
1 parent 5337f73 commit 62e0284
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 118 deletions.
40 changes: 4 additions & 36 deletions backdrop/transformers/tasks/latest_dataset_value.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import string

from .util import encode_id
from .util import encode_id, is_latest_data
from ..worker import config

from performanceplatform.client import AdminAPI, DataSet
from performanceplatform.client import AdminAPI

data_type_to_value_mappings = {
'completion-rate': 'rate',
Expand All @@ -12,39 +12,6 @@
}


def get_read_params(transform_params, latest_timestamp):
read_params = {}
if 'period' in transform_params:
read_params['duration'] = 1
read_params['period'] = transform_params['period']
else:
read_params['start_at'] = latest_timestamp
read_params['sort_by'] = '_timestamp:descending'
return read_params


def is_latest_data(data_set_config, transform, latest_datum):
"""
Read from backdrop to determine if new data is the latest.
"""

data_set = DataSet.from_group_and_type(
config.BACKDROP_READ_URL,
data_set_config['data_group'],
data_set_config['data_type']
)

transform_params = transform.get('query_parameters', {})
read_params = get_read_params(transform_params, latest_datum['_timestamp'])
existing_data = data_set.get(query_parameters=read_params)

if existing_data['data']:
if existing_data['data'][0]['_timestamp'] > latest_datum['_timestamp']:
return False

return True


def compute(new_data, transform, data_set_config):

# Sort the new data by timestamp and use the latest data point.
Expand All @@ -71,7 +38,8 @@ def compute(new_data, transform, data_set_config):
data_type = string.replace(data_set_config['data_type'], '-', '_')

for dashboard_config in configs:
if dashboard_config['published'] and latest_datum[value_key] is not None:
if(dashboard_config['published']
and latest_datum[value_key] is not None):
slug = dashboard_config['slug']
id = encode_id(slug, data_type)
latest_values.append({
Expand Down
117 changes: 69 additions & 48 deletions backdrop/transformers/tasks/latest_transaction_explorer_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from performanceplatform.client import AdminAPI

from .util import encode_id, group_by
from .util import encode_id, group_by, is_latest_data

required_data_points = [
REQUIRED_DATA_POINTS = [
"cost_per_transaction",
"digital_cost_per_transaction",
"digital_takeup",
Expand All @@ -13,57 +13,78 @@
"total_cost",
]

required_fields = [
REQUIRED_FIELDS = [
"_timestamp",
"end_at",
"period",
"service_id",
"type"
]

admin_api = AdminAPI(
config.STAGECRAFT_URL,
config.STAGECRAFT_OAUTH_TOKEN)

def compute(data, options, data_set_config=None):

admin_api = AdminAPI(
config.STAGECRAFT_URL,
config.STAGECRAFT_OAUTH_TOKEN)

def get_latest_data_points(data):
data.sort(key=lambda item: item['_timestamp'])
return data

def get_stripped_down_data_for_data_point_name_only(
dashboard_config,
latest_data_points,
data_point_name):
most_recent_data = latest_data_points[0]
all_fields = required_fields + [data_point_name]
new_data = {}
for field in all_fields:
new_data[field] = most_recent_data[field]
new_data['dashboard_slug'] = dashboard_config['slug']
new_data['_id'] = encode_id(
new_data['dashboard_slug'],
data_point_name)
return new_data

def service_ids():
for service_data_group in group_by('service_id', data).items():
yield service_data_group[0], get_latest_data_points(
service_data_group[1])

def dashboard_configs():
for service_id, latest_data_points in service_ids():
dashboard_config = admin_api.get_dashboard_by_tx_id(service_id)[0]
if dashboard_config:
yield dashboard_config, latest_data_points

def build_data():
data = []
for dashboard_config, latest_data_points in dashboard_configs():
for data_point_name in required_data_points:
data.append(get_stripped_down_data_for_data_point_name_only(
dashboard_config, latest_data_points, data_point_name))
return data

return build_data()

def _get_latest_data_points(data):
data.sort(key=lambda item: item['_timestamp'], reverse=True)
return data[0]


def _get_stripped_down_data_for_data_point_name_only(
dashboard_config,
latest_data_points,
data_point_name):
"""Builds up backdrop ready datum
for a single transaction explorer metric
"""
all_fields = REQUIRED_FIELDS + [data_point_name]
new_data = {}
for field in all_fields:
new_data[field] = latest_data_points[field]
new_data['dashboard_slug'] = dashboard_config['slug']
new_data['_id'] = encode_id(
new_data['dashboard_slug'],
data_point_name)
return new_data


def _service_ids_with_latest_data(data):
for service_data_group in group_by('service_id', data).items():
yield service_data_group[0], _get_latest_data_points(
service_data_group[1])


def _dashboard_configs_with_latest_data(data):
for service_id, latest_data in _service_ids_with_latest_data(data):
dashboard_config = admin_api.get_dashboard_by_tx_id(service_id)[0]
if dashboard_config:
yield dashboard_config, latest_data


def _get_data_points_for_each_tx_metric(data, transform, data_set_config):
for dashboard_config, latest_data in _dashboard_configs_with_latest_data(
data):
for data_point_name in REQUIRED_DATA_POINTS:
latest_datum = _get_stripped_down_data_for_data_point_name_only(
dashboard_config, latest_data, data_point_name)
# we need to look at, for example,
# digital-takeup on the output data set - tx not the only source.
if is_latest_data({'data_group': transform['output']['data-group'],
'data_type': transform['output']['data-type']},
transform,
latest_datum,
additional_read_params={
'filter_by': 'dashboard_slug:{}'.format(
latest_datum['dashboard_slug'])
}):
yield latest_datum


def compute(data, transform, data_set_config=None):
return [datum for datum
in _get_data_points_for_each_tx_metric(
data,
transform,
data_set_config)]
43 changes: 43 additions & 0 deletions backdrop/transformers/tasks/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from performanceplatform.client import DataSet

from ..worker import config

import base64

from collections import OrderedDict
Expand All @@ -23,3 +27,42 @@ def encode_id(*parts):
joined = '_'.join(parts)
joined_bytes = joined.encode('utf-8')
return base64.urlsafe_b64encode(joined_bytes)


def _get_read_params(transform_params, latest_timestamp):
read_params = {}
if 'period' in transform_params:
read_params['duration'] = 1
read_params['period'] = transform_params['period']
else:
read_params['start_at'] = latest_timestamp
read_params['sort_by'] = '_timestamp:descending'
return read_params


def is_latest_data(data_set_config,
transform,
latest_datum,
additional_read_params={}):
"""
Read from backdrop to determine if new data is the latest.
"""

data_set = DataSet.from_group_and_type(
config.BACKDROP_READ_URL,
data_set_config['data_group'],
data_set_config['data_type']
)

transform_params = transform.get('query_parameters', {})
generated_read_params = _get_read_params(
transform_params, latest_datum['_timestamp'])
read_params = dict(
generated_read_params.items() + additional_read_params.items())
existing_data = data_set.get(query_parameters=read_params)

if existing_data['data']:
if existing_data['data'][0]['_timestamp'] > latest_datum['_timestamp']:
return False

return True
28 changes: 14 additions & 14 deletions tests/transformers/fixtures/transactions_explorer_example_data.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[
{
"_day_start_at": "2011-04-01T00:00:00+00:00",
"_hour_start_at": "2011-04-01T00:00:00+00:00",
"_day_start_at": "2013-04-01T00:00:00+00:00",
"_hour_start_at": "2013-04-01T00:00:00+00:00",
"_id": "MjAxMS0wNC0wMSAwMDowMDowMDIwMTItMDQtMDEgMDA6MDA6MDBiaXMtYW5udWFsLXJldHVybnM=",
"_month_start_at": "2011-04-01T00:00:00+00:00",
"_quarter_start_at": "2011-04-01T00:00:00+00:00",
"_timestamp": "2011-04-01T00:00:00+00:00",
"_month_start_at": "2013-04-01T00:00:00+00:00",
"_quarter_start_at": "2013-04-01T00:00:00+00:00",
"_timestamp": "2013-04-01T00:00:00+00:00",
"_updated_at": "2014-03-19T10:44:32.286000+00:00",
"_week_start_at": "2011-03-28T00:00:00+00:00",
"_week_start_at": "2013-03-28T00:00:00+00:00",
"cost_per_transaction": 5.2,
"digital_cost_per_transaction": 2.52,
"digital_takeup": 0.965537995968002,
Expand All @@ -27,7 +27,7 @@
"_quarter_start_at": "2012-01-01T00:00:00+00:00",
"_timestamp": "2012-01-01T00:00:00+00:00",
"_updated_at": "2014-03-19T10:44:32.287000+00:00",
"_week_start_at": "2011-12-26T00:00:00+00:00",
"_week_start_at": "2013-12-26T00:00:00+00:00",
"cost_per_transaction": 2.63,
"digital_cost_per_transaction": 2.36,
"digital_takeup": 0.9756123825537215,
Expand All @@ -40,14 +40,14 @@
"type": "seasonally-adjusted"
},
{
"_day_start_at": "2011-04-01T00:00:00+00:00",
"_hour_start_at": "2011-04-01T00:00:00+00:00",
"_day_start_at": "2013-04-01T00:00:00+00:00",
"_hour_start_at": "2013-04-01T00:00:00+00:00",
"_id": "MjAxMS0wNC0wMSAwMDowMDowMDIwMTItMDQtMDEgMDA6MDA6MDBiaXMtYW5udWFsLXJldHVybnM=",
"_month_start_at": "2011-04-01T00:00:00+00:00",
"_quarter_start_at": "2011-04-01T00:00:00+00:00",
"_timestamp": "2011-04-01T00:00:00+00:00",
"_month_start_at": "2013-04-01T00:00:00+00:00",
"_quarter_start_at": "2013-04-01T00:00:00+00:00",
"_timestamp": "2013-04-01T00:00:00+00:00",
"_updated_at": "2014-03-19T10:44:32.286000+00:00",
"_week_start_at": "2011-03-28T00:00:00+00:00",
"_week_start_at": "2013-03-28T00:00:00+00:00",
"cost_per_transaction": 5.2,
"digital_cost_per_transaction": 2.52,
"digital_takeup": 0.965537995968002,
Expand All @@ -67,7 +67,7 @@
"_quarter_start_at": "2012-01-01T00:00:00+00:00",
"_timestamp": "2012-01-01T00:00:00+00:00",
"_updated_at": "2014-03-19T10:44:32.287000+00:00",
"_week_start_at": "2011-12-26T00:00:00+00:00",
"_week_start_at": "2013-12-26T00:00:00+00:00",
"cost_per_transaction": 2.63,
"digital_cost_per_transaction": 2.36,
"digital_takeup": 0.9756123825537215,
Expand Down
2 changes: 1 addition & 1 deletion tests/transformers/tasks/test_latest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_compute(self, mock_dashboard, mock_dataset):
{
'_count': 1.0,
'_end_at': '2012-01-19T00:00:00+00:00',
'_timestamp': '2012`-01-12T00:00:00+00:00'
'_timestamp': '2012-01-12T00:00:00+00:00'
}
]
}
Expand Down
Loading

0 comments on commit 62e0284

Please sign in to comment.