Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Encode 492 #506

Merged
merged 21 commits into from

1 participant

@pld
  • this is a change to the way we store data that will require migrating or clearing any current databases
  • in observations removed the reserved key dataset_observation_id, it fulfills the same function as dataset_id
@pld

Make a backup of the bamboo.io database, empty the bamboo.io database, then merge and deploy this. Sound good?

@pld
pld commented

Rebased and green, will put on dev for testing.

@pld
pld commented

This is now on dev and I have successfully run the scripts/commands.sh tests against it.

@pld pld merged commit a3fcc95 into master
@pld pld deleted the encode-492 branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 4, 2013
  1. @pld

    numeric encode start

    pld authored
  2. @pld

    push encoding into batch save and read level, eliminate parent_datase…

    pld authored
    …t_id (obviated by dataset_id)
  3. @pld

    other query arg fixes

    pld authored
  4. @pld
  5. @pld
  6. @pld

    shorted reserved code

    pld authored
  7. @pld
  8. @pld

    fixes

    pld authored
Commits on Apr 6, 2013
  1. @pld

    cleanup batch for key encode

    pld authored
  2. @pld

    stricter create tests

    pld authored
  3. @pld

    store encoding on update

    pld authored
  4. @pld
  5. @pld
  6. @pld

    pass encoding through

    pld authored
  7. @pld

    encoding fix

    pld authored
Commits on Apr 7, 2013
  1. @pld
  2. @pld
  3. @pld
  4. @pld
Commits on Apr 8, 2013
  1. @pld

    refactor some calculator code

    pld authored
Commits on Apr 9, 2013
  1. @pld
This page is out of date. Refresh to see the latest.
View
21 bamboo/config/mongo_index.py
@@ -1,21 +0,0 @@
-from pymongo import ASCENDING
-
-from bamboo.config.db import Database
-
-
-def ensure_indexing():
- """Ensure that bamboo models are indexed."""
- db = Database.db()
- calculations = db.calculations
- datasets = db.datasets
- observations = db.observations
- datasets.ensure_index([
- ("BAMBOO_RESERVED_KEY_dataset_id", ASCENDING),
- ("BAMBOO_RESERVED_KEY_dataset_observation_id", ASCENDING)])
- observations.ensure_index([
- ("BAMBOO_RESERVED_KEY_dataset_observation_id", ASCENDING)])
- calculations.ensure_index([("BAMBOO_RESERVED_KEY_dataset_id", ASCENDING)])
-
-
-if __name__ == "__main__":
- ensure_indexing()
View
14 bamboo/controllers/datasets.py
@@ -120,8 +120,10 @@ def action(dataset, query=query, select=select, limit=limit):
if select:
select.update(dict(zip(groups, [1] * len(groups))))
- query_args = QueryArgs(
- query=query, select=select, limit=limit, order_by=order_by)
+ query_args = QueryArgs(query=query,
+ select=select,
+ limit=limit,
+ order_by=order_by)
dframe = dataset.dframe(query_args)
return dataset.summarize(dframe, groups=groups,
@@ -176,9 +178,11 @@ def action(dataset, limit=limit, query=query, select=select):
query = self.__parse_query(query)
select = self.__parse_select(select)
- query_args = QueryArgs(
- query=query, select=select, distinct=distinct,
- limit=limit, order_by=order_by)
+ query_args = QueryArgs(query=query,
+ select=select,
+ distinct=distinct,
+ limit=limit,
+ order_by=order_by)
if count:
return dataset.count(query_args)
View
1  bamboo/core/aggregations.py
@@ -333,6 +333,7 @@ class SumAggregation(Aggregation):
def reduce(self, dframe, columns):
self.columns = columns
self.column = columns[0]
+ dframe = dframe.reset_index()
dframe[self.name] += self.agg()[self.name]
return dframe
View
4 bamboo/core/aggregator.py
@@ -61,7 +61,7 @@ def update(self, child_dataset, calculator, formula):
# get dframe only including rows from this parent
dframe = child_dataset.dframe(
keep_parent_ids=True,
- reload=True).only_rows_for_parent_id(parent_dataset_id)
+ reload_=True).only_rows_for_parent_id(parent_dataset_id)
# remove rows in child from parent
child_dataset.remove_parent_observations(parent_dataset_id)
@@ -75,8 +75,8 @@ def update(self, child_dataset, calculator, formula):
new_agg_dframe = concat([child_dataset.dframe(), dframe])
new_agg_dframe = new_agg_dframe.add_parent_column(parent_dataset_id)
-
child_dataset.replace_observations(new_agg_dframe)
+
return child_dataset.dframe()
def __dframe_from_calculator(self, calculator, formula, dframe):
View
40 bamboo/core/calculator.py
@@ -6,9 +6,10 @@
from bamboo.core.aggregator import Aggregator
from bamboo.core.frame import BambooFrame, NonUniqueJoinError
from bamboo.core.parser import ParseError, Parser
-from bamboo.lib.mongo import MONGO_RESERVED_KEYS
+from bamboo.lib.mongo import MONGO_RESERVED_KEY_ID, MONGO_RESERVED_KEYS
from bamboo.lib.query_args import QueryArgs
from bamboo.lib.schema_builder import make_unique
+from bamboo.lib.utils import combine_dicts, to_list
class Calculator(object):
@@ -177,9 +178,8 @@ def parse_aggregation(self, formula, name, groups, dframe=None):
# get dframe with only necessary columns
# or dummy column (needed for count aggregation)
- select = {}
- select.update({group: 1 for group in groups})
- select.update({col: 1 for col in dependent_columns})
+ select = combine_dicts({group: 1 for group in groups},
+ {col: 1 for col in dependent_columns})
if select:
query_args = QueryArgs(select=select)
dframe = self.dataset.dframe(query_args=query_args)
@@ -197,19 +197,17 @@ def parse_columns(self, formula, name, dframe=None, length=None,
functions, dependent_columns = self.parser.parse_formula(formula)
# make select from dependent_columns
- if dframe is None and dependent_columns:
- query_args = QueryArgs(
- select={col: 1 for col in dependent_columns})
- dframe = self.dataset.dframe(
- query_args=query_args,
- keep_mongo_keys=True).set_index('MONGO_RESERVED_KEY_id')
- elif dframe is None:
- # constant column, use dummy
- query_args = QueryArgs(select={'_id': 1})
+ if dframe is None:
+ select = {col: 1 for col in dependent_columns} if\
+ dependent_columns else {'_id': 1}
+
dframe = self.dataset.dframe(
- query_args=query_args,
- keep_mongo_keys=True).set_index('MONGO_RESERVED_KEY_id')
- dframe['dummy'] = 0
+ query_args=QueryArgs(select=select),
+ keep_mongo_keys=True).set_index(MONGO_RESERVED_KEY_ID)
+
+ if not dependent_columns:
+ # constant column, use dummy
+ dframe['dummy'] = 0
columns = []
@@ -233,10 +231,11 @@ def _check_update_is_valid(self, new_dframe_raw):
:raises: `NonUniqueJoinError` if update is illegal given joins of
dataset.
"""
- # TODO do not call dframe in here
+ # TODO clean this up
if any([direction == 'left' for direction, _, on, __ in
self.dataset.joined_datasets]):
- if on in new_dframe_raw.columns and \
+ # TODO do not call dframe in here
+ if on in new_dframe_raw.columns and\
on in self.dataset.dframe().columns:
merged_join_column = concat(
[new_dframe_raw[on], self.dataset.dframe()[on]])
@@ -392,7 +391,6 @@ def __update_aggregate_dataset(self, formula, new_dframe, name, groups,
"""
# parse aggregation and build column arguments
aggregator = self.parse_aggregation(formula, name, groups, new_dframe)
-
new_agg_dframe = aggregator.update(agg_dataset, self, formula)
# jsondict from new dframe
@@ -437,9 +435,7 @@ def __create_calculations_to_groups_and_datasets(self, calculations):
def __slugify_data(self, new_data, labels_to_slugs):
slugified_data = []
-
- if not isinstance(new_data, list):
- new_data = [new_data]
+ new_data = to_list(new_data)
for row in new_data:
for key, value in row.iteritems():
View
4 bamboo/core/frame.py
@@ -9,14 +9,12 @@
# reserved bamboo keys
-BAMBOO_RESERVED_KEY_PREFIX = 'BAMBOO_RESERVED_KEY_'
+BAMBOO_RESERVED_KEY_PREFIX = '^^'
DATASET_ID = BAMBOO_RESERVED_KEY_PREFIX + 'dataset_id'
-DATASET_OBSERVATION_ID = BAMBOO_RESERVED_KEY_PREFIX + 'dataset_observation_id'
INDEX = BAMBOO_RESERVED_KEY_PREFIX + 'index'
PARENT_DATASET_ID = BAMBOO_RESERVED_KEY_PREFIX + 'parent_dataset_id'
BAMBOO_RESERVED_KEYS = [
DATASET_ID,
- DATASET_OBSERVATION_ID,
INDEX,
PARENT_DATASET_ID,
]
View
8 bamboo/core/parser.py
@@ -274,10 +274,10 @@ def parse_formula(self, input_str):
for column_function in self.column_functions:
functions.append(partial(column_function.eval))
dependent_columns = dependent_columns.union(
- self._get_dependent_columns(column_function))
+ self.__get_dependent_columns(column_function))
else:
functions.append(partial(self.parsed_expr.eval))
- dependent_columns = self._get_dependent_columns(self.parsed_expr)
+ dependent_columns = self.__get_dependent_columns(self.parsed_expr)
self.context.dependent_columns = dependent_columns
@@ -310,7 +310,7 @@ def validate_formula(self, formula):
return self.aggregation
- def _get_dependent_columns(self, parsed_expr):
+ def __get_dependent_columns(self, parsed_expr):
result = []
if not hasattr(self, 'context'):
return result
@@ -318,8 +318,10 @@ def _get_dependent_columns(self, parsed_expr):
def find_dependent_columns(parsed_expr, result):
dependent_columns = parsed_expr.dependent_columns(self.context)
result.extend(dependent_columns)
+
for child in parsed_expr.get_children():
find_dependent_columns(child, result)
+
return result
return set(find_dependent_columns(parsed_expr, result))
View
11 bamboo/lib/mongo.py
@@ -6,10 +6,13 @@
# MongoDB keys
-MONGO_RESERVED_KEYS = ['_id']
-MONGO_RESERVED_KEY_PREFIX = 'MONGO_RESERVED_KEY'
-MONGO_RESERVED_KEY_STRS = [MONGO_RESERVED_KEY_PREFIX + key
- for key in MONGO_RESERVED_KEYS]
+# TODO clean this up
+MONGO_RESERVED_KEY_PREFIX = '##'
+MONGO_ID = '_id'
+MONGO_RESERVED_KEYS = [MONGO_ID]
+MONGO_RESERVED_KEY_ID = MONGO_RESERVED_KEY_PREFIX + '_id'
+MONGO_RESERVED_KEY_STRS = [MONGO_RESERVED_KEY_ID]
+
ILLEGAL_VALUES = ['$', '.']
REPLACEMENT_VALUES = [b64encode(value) for value in ILLEGAL_VALUES]
View
32 bamboo/lib/query_args.py
@@ -1,18 +1,5 @@
class QueryArgs(object):
- @property
- def order_by(self):
- order_by = self.order_by_value
-
- if order_by:
- if order_by[0] in ('-', '+'):
- sort_dir, field = -1 if order_by[0] == '-' else 1, order_by[1:]
- else:
- sort_dir, field = 1, order_by
- order_by = [(field, sort_dir)]
-
- return order_by
-
- def __init__(self, query={}, select=None, distinct=None, limit=0,
+ def __init__(self, query=None, select=None, distinct=None, limit=0,
order_by=None):
"""A holder for query arguments.
@@ -28,13 +15,22 @@ def __init__(self, query={}, select=None, distinct=None, limit=0,
- ``order_by='mycolumn'``
- ``order_by='-mycolumn'``
"""
-
- self.query = query
+ self.query = query or {}
self.select = select
self.distinct = distinct
self.limit = limit
- self.order_by_value = order_by
+ self.order_by = self.__parse_order_by(order_by)
def __nonzero__(self):
return bool(self.query or self.select or self.distinct or self.limit
- or self.order_by_value)
+ or self.order_by)
+
+ def __parse_order_by(self, order_by):
+ if order_by:
+ if order_by[0] in ('-', '+'):
+ sort_dir, field = -1 if order_by[0] == '-' else 1, order_by[1:]
+ else:
+ sort_dir, field = 1, order_by
+ order_by = [(field, sort_dir)]
+
+ return order_by
View
12 bamboo/lib/utils.py
@@ -31,3 +31,15 @@ def _parse_type(_type, value, default):
def combine_dicts(*dicts):
return dict(chain(*[_dict.iteritems() for _dict in dicts]))
+
+
+def invert_dict(dict_):
+ return {v: k for (k, v) in dict_.items()} if dict_ else {}
+
+
+def replace_keys(original, mapping):
+ return original and {mapping.get(k, k): v for k, v in original.iteritems()}
+
+
+def to_list(maybe_list):
+ return maybe_list if isinstance(maybe_list, list) else [maybe_list]
View
65 bamboo/models/abstract_model.py
@@ -1,7 +1,3 @@
-from math import ceil
-
-from pymongo.errors import AutoReconnect
-
from bamboo.config.db import Database
from bamboo.core.frame import BAMBOO_RESERVED_KEYS
from bamboo.lib.decorators import classproperty
@@ -21,6 +17,7 @@ class AbstractModel(object):
"""
__collection__ = None
+ __collectionname__ = None
# delimiter when passing multiple groups as a string
GROUP_DELIMITER = ','
@@ -112,70 +109,18 @@ def find(cls, query_args, as_dict=False, as_cursor=False):
]
@classmethod
- def find_one(cls, query, select=None):
+ def find_one(cls, query, select=None, as_dict=False):
"""Return the first row matching `query` and `select` from MongoDB.
:param query: A query to pass to MongoDB.
:param select: An optional select to pass to MongoDB.
+ :param as_dict: If true, return dicts and not model instances.
:returns: A model instance of the row returned for this query and
select.
"""
- return cls(cls.collection.find_one(query, select))
-
- @classmethod
- def batch_save(cls, dframe):
- """Save records in batches to avoid document size maximum setting.
-
- :param dframe: A DataFrame to save in the current model.
- """
- def command(records):
- cls.collection.insert(records)
-
- batch_size = cls.DB_SAVE_BATCH_SIZE
-
- cls.__batch_command_wrapper(command, dframe, batch_size)
-
- @classmethod
- def batch_update(cls, dframe):
- """Update records in batches to avoid document size maximum setting.
- dframe must have column with record (object) ids.
-
- """
- def command(records):
- # mongodb has no batch updates (fail)
- for record in records:
- spec = {'_id': record['MONGO_RESERVED_KEY_id']}
- del record['MONGO_RESERVED_KEY_id']
- doc = {"$set": record}
- cls.collection.update(spec, doc)
-
- batch_size = cls.DB_SAVE_BATCH_SIZE
-
- cls.__batch_command_wrapper(command, dframe, batch_size)
-
- @classmethod
- def __batch_command_wrapper(cls, command, dframe, batch_size):
- try:
- cls.__batch_command(command, dframe, batch_size)
- except AutoReconnect:
- batch_size /= 2
-
- # If batch size drop is less than MIN_BATCH_SIZE, assume the
- # records are too large or there is another error and fail.
- if batch_size >= cls.MIN_BATCH_SIZE:
- cls.__batch_command_wrapper(command, dframe, batch_size)
-
- @classmethod
- def __batch_command(cls, command, dframe, batch_size):
- batches = int(ceil(float(len(dframe)) / batch_size))
-
- for batch in xrange(0, batches):
- start = batch * batch_size
- end = start + batch_size
- records = [
- row.to_dict() for (_, row) in dframe[start:end].iterrows()]
- command(records)
+ record = cls.collection.find_one(query, select)
+ return record if as_dict else cls(record)
def __init__(self, record=None):
"""Instantiate with data in `record`."""
View
6 bamboo/models/calculation.py
@@ -8,6 +8,7 @@
from bamboo.lib.exceptions import ArgumentError
from bamboo.lib.query_args import QueryArgs
from bamboo.lib.schema_builder import make_unique
+from bamboo.lib.utils import to_list
from bamboo.models.abstract_model import AbstractModel
@@ -217,6 +218,7 @@ def save(self, dataset, formula, name, group_str=None):
aggregated_dataset)
else:
+ # set group if aggregation and group unset
name = self.__check_name_and_make_unique(name, dataset)
record = {
@@ -239,8 +241,7 @@ def create(cls, dataset, formula, name, group=None):
@classmethod
def create_from_list_or_dict(cls, dataset, calculations):
- if isinstance(calculations, dict):
- calculations = [calculations]
+ calculations = to_list(calculations)
if not len(calculations) or not isinstance(calculations, list):
raise ArgumentError('Improper format for JSON calculations.')
@@ -259,7 +260,6 @@ def create_from_list_or_dict(cls, dataset, calculations):
parsed_calculations.append([
c[cls.FORMULA],
c[cls.NAME], group])
-
except KeyError as e:
raise ArgumentError('Required key %s not found in JSON' % e)
View
111 bamboo/models/dataset.py
@@ -2,16 +2,17 @@
from time import gmtime, strftime
from celery.task import task
-from pandas import concat, rolling_window
+from pandas import rolling_window, Series
from bamboo.core.calculator import Calculator
from bamboo.core.frame import BambooFrame, BAMBOO_RESERVED_KEY_PREFIX,\
- DATASET_ID, DATASET_OBSERVATION_ID, INDEX, PARENT_DATASET_ID
+ DATASET_ID, INDEX, PARENT_DATASET_ID
from bamboo.core.summary import summarize
from bamboo.lib.async import call_async
from bamboo.lib.io import ImportableDataset
from bamboo.lib.query_args import QueryArgs
from bamboo.lib.schema_builder import Schema
+from bamboo.lib.utils import replace_keys, to_list
from bamboo.models.abstract_model import AbstractModel
from bamboo.models.calculation import Calculation
from bamboo.models.observation import Observation
@@ -53,11 +54,6 @@ def __init__(self, record=None):
super(Dataset, self).__init__(record)
self.__dframe = None
- # commonly accessed variables
- @property
- def dataset_observation_id(self):
- return self.record[DATASET_OBSERVATION_ID]
-
@property
def dataset_id(self):
return self.record[DATASET_ID]
@@ -168,15 +164,13 @@ def cardinality(self, col):
return self.schema.cardinality(col)
def aggregated_dataset(self, groups):
- if not isinstance(groups, list):
- groups = [groups]
-
+ groups = to_list(groups)
_id = self.aggregated_datasets_dict.get(self.join_groups(groups))
return self.find_one(_id) if _id else None
- def dframe(self, query_args=QueryArgs(), keep_parent_ids=False,
- padded=False, index=False, reload=False, keep_mongo_keys=False):
+ def dframe(self, query_args=None, keep_parent_ids=False, padded=False,
+ index=False, reload_=False, keep_mongo_keys=False):
"""Fetch the dframe for this dataset.
:param query_args: An optional QueryArgs to hold the query arguments.
@@ -184,7 +178,7 @@ def dframe(self, query_args=QueryArgs(), keep_parent_ids=False,
default False.
:param padded: Used for joining, default False.
:param index: Return the index with dframe, default False.
- :param reload: Force refresh of data, default False.
+ :param reload_: Force refresh of data, default False.
:param keep_mongo_keys: Used for updating documents, default False.
:returns: Return BambooFrame with contents based on query parameters
@@ -195,13 +189,17 @@ def dframe(self, query_args=QueryArgs(), keep_parent_ids=False,
cacheable = not (query_args or keep_parent_ids or padded)
# use cached copy if we have already fetched it
- if cacheable and not reload and self.__dframe is not None:
+ if cacheable and not reload_ and self.__dframe is not None:
return self.__dframe
+ query_args = query_args or QueryArgs()
observations = self.observations(query_args, as_cursor=True)
- dframe = self.__batch_read_dframe_from_cursor(
- observations, query_args.distinct, query_args.limit)
+ if query_args.distinct:
+ return BambooFrame(observations)
+
+ dframe = Observation.batch_read_dframe_from_cursor(
+ self, observations, query_args.distinct, query_args.limit)
dframe.decode_mongo_reserved_keys(keep_mongo_keys=keep_mongo_keys)
@@ -230,11 +228,12 @@ def dframe(self, query_args=QueryArgs(), keep_parent_ids=False,
return dframe
- def count(self, query_args=QueryArgs()):
+ def count(self, query_args=None):
"""Return the count of rows matching query in dataset.
:param query_args: An optional QueryArgs to hold the query arguments.
"""
+ query_args = query_args or QueryArgs()
obs = self.observations(query_args, as_cursor=True)
count = len(obs) if query_args.distinct else obs.count()
@@ -285,7 +284,6 @@ def save(self, dataset_id=None):
record = {
DATASET_ID: dataset_id,
- DATASET_OBSERVATION_ID: uuid.uuid4().hex,
self.AGGREGATED_DATASETS: {},
self.CREATED_AT: strftime("%Y-%m-%d %H:%M:%S", gmtime()),
self.STATE: self.STATE_PENDING,
@@ -399,21 +397,15 @@ def info(self, update=None):
self.PARENT_IDS: parent_ids,
}
- def observations(self, query_args=QueryArgs(), as_cursor=False):
+ def observations(self, query_args=None, as_cursor=False):
"""Return observations for this dataset.
:param query_args: An optional QueryArgs to hold the query arguments.
:param as_cursor: Return the observations as a cursor.
"""
- if query_args.distinct:
- as_cursor = True
-
- observations = Observation.find(self, query_args, as_cursor=as_cursor)
+ query_args = query_args or QueryArgs()
- if query_args.distinct:
- observations = observations.distinct(query_args.distinct)
-
- return observations
+ return Observation.find(self, query_args, as_cursor=as_cursor)
def calculations(self):
"""Return the calculations for this dataset."""
@@ -434,8 +426,7 @@ def add_observations(self, new_data):
update_id = uuid.uuid4().hex
self.add_pending_update(update_id)
- if not isinstance(new_data, list):
- new_data = [new_data]
+ new_data = to_list(new_data)
calculator = Calculator(self)
@@ -544,21 +535,29 @@ def reload(self):
def clear_cache(self):
self.__dframe = None
+
return self
+ def add_id_column(self, dframe):
+ id_column = Series([self.dataset_id] * len(dframe))
+ id_column.name = DATASET_ID
+
+ return BambooFrame(dframe.join(id_column))
+
def encode_dframe_columns(self, dframe):
"""Encode the columns in `dframe` to slugs and add ID column.
- The ID column is the dataset_observation_id for this dataset. This is
+ The ID column is the dataset_id for this dataset. This is
used to link observations to a specific dataset.
:param dframe: The DataFame to rename columns in and add an ID column
to.
- :returns: A the modified `dframe` as a BambooFrame.
+ :returns: A modified `dframe` as a BambooFrame.
"""
+ dframe = self.add_id_column(dframe)
encoded_columns_map = self.schema.rename_map_for_dframe(dframe)
dframe = dframe.rename(columns=encoded_columns_map)
- dframe[DATASET_OBSERVATION_ID] = self.dataset_observation_id
+
return BambooFrame(dframe)
def new_agg_dataset(self, dframe, groups):
@@ -569,6 +568,7 @@ def new_agg_dataset(self, dframe, groups):
:param dframe: The DataFrame to store in the new aggregated dataset.
:param groups: The groups associated with this aggregated dataset.
+ :returns: The newly created aggregated dataset.
"""
agg_dataset = self.create()
agg_dataset.save_observations(dframe)
@@ -580,6 +580,8 @@ def new_agg_dataset(self, dframe, groups):
self.update({
self.AGGREGATED_DATASETS: agg_datasets_dict})
+ return agg_dataset
+
def has_pending_updates(self, update_id):
"""Check if this dataset has pending updates.
@@ -607,7 +609,19 @@ def update_complete(self, update_id):
{'_id': self.record['_id']},
{'$pull': {self.PENDING_UPDATES: update_id}})
- def resample(self, date_column, interval, how, query={}):
+ def update_stats(self, dframe, update=False):
+ """Update store statistics for this dataset.
+
+ :param dframe: Use this DataFrame for summary statistics.
+ :param update: Update or replace summary statistics, default False.
+ """
+ self.update({
+ self.NUM_ROWS: len(dframe),
+ self.STATE: self.STATE_READY,
+ })
+ self.summarize(dframe, update=update)
+
+ def resample(self, date_column, interval, how, query=None):
"""Resample a dataset given a new time frame.
:param date_column: The date column use as the index for resampling.
@@ -647,36 +661,5 @@ def set_olap_type(self, column, olap_type):
self.set_schema(schema, False)
- def __batch_read_dframe_from_cursor(self, observations, distinct, limit):
- """Read a DataFrame from a MongoDB Cursor in batches."""
- dframes = []
- batch = 0
-
- while True:
- start = batch * self.DB_READ_BATCH_SIZE
- end = start + self.DB_READ_BATCH_SIZE
-
- if limit > 0 and end > limit:
- end = limit
-
- # if there is a limit and we are done
- if start >= end:
- break
-
- current_observations = [ob for ob in observations[start:end]]
-
- # if the batches exhausted the data
- if not len(current_observations):
- break
-
- dframes.append(BambooFrame(current_observations))
-
- if not distinct:
- observations.rewind()
-
- batch += 1
-
- return BambooFrame(concat(dframes) if len(dframes) else [])
-
def __add_linked_data(self, link_key, existing_data, new_data):
self.update({link_key: existing_data + [new_data]})
View
243 bamboo/models/observation.py
@@ -1,17 +1,27 @@
-from bamboo.core.frame import BambooFrame, DATASET_OBSERVATION_ID, INDEX
+from math import ceil
+
+from pandas import concat
+from pymongo.errors import AutoReconnect
+
+from bamboo.core.frame import BambooFrame, DATASET_ID, INDEX
from bamboo.lib.datetools import parse_timestamp_query
+from bamboo.lib.mongo import MONGO_ID, MONGO_RESERVED_KEY_ID
from bamboo.lib.query_args import QueryArgs
+from bamboo.lib.utils import combine_dicts, invert_dict, replace_keys
from bamboo.models.abstract_model import AbstractModel
class Observation(AbstractModel):
__collectionname__ = 'observations'
+ ENCODING = 'enc'
+ ENCODING_DATASET_ID = '%s_%s' % (DATASET_ID, ENCODING)
@classmethod
def delete(cls, dataset, index):
query = {INDEX: index,
- DATASET_OBSERVATION_ID: dataset.dataset_observation_id}
+ DATASET_ID: dataset.dataset_id}
+ query = cls.encode(query, dataset=dataset)
super(cls, cls()).delete(query)
@@ -23,13 +33,31 @@ def delete_all(cls, dataset, query={}):
:param query: An optional query to restrict deletion.
"""
query.update({
- DATASET_OBSERVATION_ID: dataset.dataset_observation_id
+ DATASET_ID: dataset.dataset_id
})
+ query = cls.encode(query, dataset=dataset)
super(cls, cls()).delete(query)
@classmethod
- def find(cls, dataset, query_args=QueryArgs(), as_cursor=False):
+ def encoding(cls, dataset):
+ record = super(cls, cls).find_one({
+ cls.ENCODING_DATASET_ID: dataset.dataset_id
+ }).record
+ return record[cls.ENCODING] if record else None
+
+ @classmethod
+ def encode(cls, dict_, dataset=None, encoding=None):
+ if dataset:
+ encoding = cls.encoding(dataset)
+ return replace_keys(dict_, encoding) if encoding else dict_
+
+ @classmethod
+ def decoding(cls, dataset):
+ return invert_dict(cls.encoding(dataset))
+
+ @classmethod
+ def find(cls, dataset, query_args=None, as_cursor=False):
"""Return observation rows matching parameters.
:param dataset: Dataset to return rows for.
@@ -40,47 +68,66 @@ def find(cls, dataset, query_args=QueryArgs(), as_cursor=False):
:returns: A list of dictionaries matching the passed in `query` and
other parameters.
"""
- query = query_args.query
+ id_query = {DATASET_ID: dataset.dataset_id}
+ encoding = cls.encoding(dataset) or {}
- if dataset.schema:
- query = parse_timestamp_query(query, dataset.schema)
+ if query_args:
+ query = parse_timestamp_query(query_args.query, dataset.schema)
+ query.update(id_query)
+ query_args.query = cls.encode(query, encoding=encoding)
- query[DATASET_OBSERVATION_ID] = dataset.dataset_observation_id
- query_args.query = query
+ order_by = query_args.order_by
+ query_args.order_by = order_by and cls.encode(
+ dict(order_by), encoding=encoding).items()
- return super(cls, cls).find(
- query_args, as_dict=True, as_cursor=as_cursor)
+ select = query_args.select
+ query_args.select = select and cls.encode(
+ select, encoding=encoding)
+ else:
+ query_args = QueryArgs()
+ query_args.query = cls.encode(id_query, encoding=encoding)
+
+ distinct = query_args.distinct
+
+ records = super(cls, cls).find(query_args, as_dict=True,
+ as_cursor=(as_cursor or distinct))
+
+ if query_args.distinct:
+ records = records.distinct(encoding.get(distinct, distinct))
+
+ return records
@classmethod
def update_from_dframe(cls, dframe, dataset):
dataset.build_schema(dframe)
- # must have MONGO_RESERVED_KEY_id as index
- if not DATASET_OBSERVATION_ID in dframe.columns:
- cls.batch_update(dataset.encode_dframe_columns(
- dframe).reset_index())
- else:
- cls.batch_update(dframe.reset_index())
+ encoded_dframe = dframe.reset_index()
- # add metadata to dataset, discount ID column
- dataset.update({
- dataset.NUM_ROWS: len(dframe),
- dataset.STATE: cls.STATE_READY,
- })
- # TODO make summary update-friendly
- dataset.summarize(dframe, update=True)
+ if not DATASET_ID in encoded_dframe.columns:
+ encoded_dframe = dataset.encode_dframe_columns(encoded_dframe)
+
+ encoding = cls.encoding(dataset)
+ start = sorted(map(int, encoding.values()))[-1] + 1
+ combine_dicts(cls.__make_encoding(dframe, start), encoding)
+
+ cls.__batch_update(encoded_dframe, encoding)
+ cls.__store_encoding(dataset, encoding)
+ dataset.update_stats(dframe, update=True)
@classmethod
- def find_one(cls, dataset, index):
+ def find_one(cls, dataset, index, decode=True):
"""Return row by index.
:param dataset: The dataset to find the row for.
:param index: The index of the row to find.
"""
query = {INDEX: index,
- DATASET_OBSERVATION_ID: dataset.dataset_observation_id}
+ DATASET_ID: dataset.dataset_id}
+ query = cls.encode(query, dataset=dataset)
+ decoding = cls.decoding(dataset)
+ record = super(cls, cls).find_one(query, as_dict=True)
- return super(cls, cls).find_one(query)
+ return cls(cls.encode(record, encoding=decoding) if decode else record)
@classmethod
def save(cls, dframe, dataset):
@@ -99,20 +146,14 @@ def save(cls, dframe, dataset):
if not dataset.schema:
dataset.build_schema(dframe)
- dframe = cls.__add_index_to_dframe(dframe)
+ # Add indx and encode columns.
+ encoded_dframe = dataset.encode_dframe_columns(
+ cls.__add_index_to_dframe(dframe))
- if not DATASET_OBSERVATION_ID in dframe.columns:
- # This dframe has not been saved before, encode its columns.
- cls.batch_save(dataset.encode_dframe_columns(dframe))
- else:
- cls.batch_save(dframe)
-
- # add metadata to dataset, discount ID column
- dataset.update({
- dataset.NUM_ROWS: len(dframe),
- dataset.STATE: cls.STATE_READY,
- })
- dataset.summarize(dframe)
+ encoding = cls.__make_encoding(encoded_dframe)
+ cls.__batch_save(encoded_dframe, encoding)
+ cls.__store_encoding(dataset, encoding)
+ dataset.update_stats(dframe)
@classmethod
def update(cls, dataset, index, record):
@@ -125,7 +166,8 @@ def update(cls, dataset, index, record):
:param dex: The index of the row to update.
:param record: The dictionary to update the row with.
"""
- observation = cls.find_one(dataset, index)
+ observation = cls.find_one(dataset, index, decode=False)
+ record = cls.encode(record, dataset=dataset)
super(cls, observation).update(record)
@classmethod
@@ -140,3 +182,122 @@ def __add_index_to_dframe(self, dframe):
dframe.rename(columns={'index': INDEX}, inplace=True)
return BambooFrame(dframe)
+
+ @classmethod
+ def __batch_save(cls, dframe, encoding):
+ """Save records in batches to avoid document size maximum setting.
+
+ :param dframe: A DataFrame to save in the current model.
+ """
+ def command(records, encoding):
+ cls.collection.insert(records)
+
+ batch_size = cls.DB_SAVE_BATCH_SIZE
+
+ cls.__batch_command_wrapper(command, dframe, encoding, batch_size)
+
+ @classmethod
+ def __batch_update(cls, dframe, encoding):
+ """Update records in batches to avoid document size maximum setting.
+
+ DataFrame must have column with record (object) ids.
+
+ :param dfarme: The DataFrame to update.
+ """
+ def command(records, encoding):
+ # Encode the reserved key to access the row ID.
+ mongo_reserved_key_id = encoding.get(
+ MONGO_RESERVED_KEY_ID, MONGO_RESERVED_KEY_ID)
+
+ # MongoDB has no batch updates.
+ for record in records:
+ spec = {MONGO_ID: record[mongo_reserved_key_id]}
+ del record[mongo_reserved_key_id]
+ doc = {'$set': record}
+ cls.collection.update(spec, doc)
+
+ batch_size = cls.DB_SAVE_BATCH_SIZE
+
+ cls.__batch_command_wrapper(command, dframe, encoding, batch_size)
+
+ @classmethod
+ def __batch_command_wrapper(cls, command, dframe, encoding, batch_size):
+ try:
+ cls.__batch_command(command, dframe, encoding, batch_size)
+ except AutoReconnect:
+ batch_size /= 2
+
+ # If batch size drop is less than MIN_BATCH_SIZE, assume the
+ # records are too large or there is another error and fail.
+ if batch_size >= cls.MIN_BATCH_SIZE:
+ cls.__batch_command_wrapper(
+ command, dframe, encoding, batch_size)
+
+ @classmethod
+ def __batch_command(cls, command, dframe, encoding, batch_size):
+ batches = int(ceil(float(len(dframe)) / batch_size))
+
+ for batch in xrange(0, batches):
+ start = batch * batch_size
+ end = start + batch_size
+ current_dframe = dframe[start:end]
+ records = cls.__encode_records(current_dframe, encoding)
+ command(records, encoding)
+
+ @classmethod
+ def __make_encoding(cls, dframe, start=0):
+ # Ensure that DATASET_ID is first so that we can guarantee an index.
+ columns = [DATASET_ID] + sorted(dframe.columns - [DATASET_ID])
+ return {v: str(start + i) for (i, v) in enumerate(columns)}
+
+ @classmethod
+ def __encode_records(cls, dframe, encoding):
+ return [replace_keys(row.to_dict(), encoding) for (_, row)
+ in dframe.iterrows()]
+
+ @classmethod
+ def __store_encoding(cls, dataset, encoding):
+ """Store encoded columns with dataset.
+
+ :param dataset: The dataset to store the encoding with.
+ :param encoding: The encoding for dataset.
+ """
+ record = {cls.ENCODING_DATASET_ID: dataset.dataset_id,
+ cls.ENCODING: encoding}
+ super(cls, cls()).delete({cls.ENCODING_DATASET_ID: dataset.dataset_id})
+ super(cls, cls()).save(record)
+
+ @classmethod
+ def batch_read_dframe_from_cursor(cls, dataset, observations, distinct,
+ limit):
+ """Read a DataFrame from a MongoDB Cursor in batches."""
+ dframes = []
+ batch = 0
+ decoding = cls.decoding(dataset)
+
+ while True:
+ start = batch * cls.DB_READ_BATCH_SIZE
+ end = start + cls.DB_READ_BATCH_SIZE
+
+ if limit > 0 and end > limit:
+ end = limit
+
+ # if there is a limit and we are done
+ if start >= end:
+ break
+
+ current_observations = [
+ replace_keys(ob, decoding) for ob in observations[start:end]]
+
+ # if the batches exhausted the data
+ if not len(current_observations):
+ break
+
+ dframes.append(BambooFrame(current_observations))
+
+ if not distinct:
+ observations.rewind()
+
+ batch += 1
+
+ return BambooFrame(concat(dframes) if len(dframes) else [])
View
10 bamboo/tests/controllers/test_calculations.py
@@ -89,12 +89,11 @@ def __verify_create(self, response):
self.__wait_for_calculation_ready(self.dataset_id, self.name)
dataset = Dataset.find_one(self.dataset_id)
+ dframe = dataset.dframe()
self.assertTrue(self.name in dataset.schema.keys())
-
- dataset = Dataset.find_one(self.dataset_id)
-
- self.assertEqual(TestAbstractDatasets.NUM_ROWS, len(dataset.dframe()))
+ self.assertTrue(self.name in dframe.columns)
+ self.assertEqual(TestAbstractDatasets.NUM_ROWS, len(dframe))
def test_show(self):
self.__post_formula()
@@ -178,7 +177,8 @@ def test_create_update_summary(self):
# stats should have new column for calculation
dataset = Dataset.find_one(self.dataset_id)
- self.assertTrue(self.name in dataset.stats.get(Dataset.ALL).keys())
+ stats = dataset.stats.get(Dataset.ALL)
+ self.assertTrue(self.name in stats.keys())
def test_delete_nonexistent_calculation(self):
dataset_id = self._post_file()
View
8 bamboo/tests/controllers/test_datasets_merge.py
@@ -69,9 +69,9 @@ def test_merge_datasets(self):
dframe1 = datasets[0].dframe()
merged_dataset = Dataset.find_one(result[Dataset.ID])
- merged_rows = merged_dataset.observations()
+ merged_dframe = merged_dataset.dframe(keep_parent_ids=True)
- for row in merged_rows:
+ for _, row in merged_dframe.iterrows():
self.assertTrue(PARENT_DATASET_ID in row.keys())
merged_dframe = merged_dataset.dframe()
@@ -125,9 +125,9 @@ def test_merge_datasets_async(self):
dframe1 = datasets[0].dframe()
merged_dataset = Dataset.find_one(merged_id)
- merged_rows = merged_dataset.observations()
+ merged_dframe = merged_dataset.dframe(keep_parent_ids=True)
- for row in merged_rows:
+ for _, row in merged_dframe.iterrows():
self.assertTrue(PARENT_DATASET_ID in row.keys())
merged_dframe = merged_dataset.dframe()
View
4 bamboo/tests/controllers/test_datasets_update.py
@@ -49,8 +49,8 @@ def test_setup_datasets(self):
def _test_update1(self):
for dataset_id in [self.merged_dataset1_id, self.merged_dataset2_id]:
merged_dataset = Dataset.find_one(dataset_id)
- merged_rows = merged_dataset.observations()
- for row in merged_rows:
+ merged_dframe = merged_dataset.dframe(keep_parent_ids=True)
+ for _, row in merged_dframe.iterrows():
self.assertTrue(PARENT_DATASET_ID in row.keys())
self._verify_dataset(self.dataset1_id,
View
8 bamboo/tests/models/test_observation.py
@@ -51,14 +51,18 @@ def test_find_with_select(self):
query_args = QueryArgs(select={"rating": 1})
rows = Observation.find(self.dataset, query_args)
self.assertTrue(isinstance(rows, list))
- self.assertEquals(sorted(rows[0].keys()), ['_id', 'rating'])
+ row = Observation.encode(
+ rows[0], encoding=Observation.decoding(self.dataset))
+ self.assertEquals(sorted(row.keys()), ['_id', 'rating'])
def test_find_with_select_and_query(self):
self._save_observations()
self.query_args.select = {"rating": 1}
rows = Observation.find(self.dataset, self.query_args)
self.assertTrue(isinstance(rows, list))
- self.assertEquals(sorted(rows[0].keys()), ['_id', 'rating'])
+ row = Observation.encode(
+ rows[0], encoding=Observation.decoding(self.dataset))
+ self.assertEquals(sorted(row.keys()), ['_id', 'rating'])
def test_delete(self):
self._save_observations()
View
13 scripts/commands.sh
@@ -1,6 +1,5 @@
-# var for bamboo server
-
-HOST="http://bamboo.io"
+# the dev server
+HOST='http://54.225.70.200'
WAIT_TIME=15
while getopts l opt
@@ -38,23 +37,17 @@ echo -e "\nRetrieve data for School Zone RWIKA"
RET=$(curl -#g $HOST/datasets/$ID?query='{"school_zone":"RWIKA"}')
echo $RET
-sleep $WAIT_TIME
-
echo -e "\nCalculate summary statistics for School Zone RWIKA"
# using slug for "School Zone", which is "school_zone"
RET=$(curl -#g $HOST/datasets/$ID/summary?query='{"school_zone":"RWIKA"}'\&select=all)
echo $RET
-sleep $WAIT_TIME
-
echo -e "\nCalculate summary statistics with a grouping (truncated to 1000 characters)"
echo -e "Group by District showing only PRIVATE schools"
# using slug for "Public or Private", which is "public_or_private"
RET=$(curl -#g $HOST/datasets/$ID/summary?query='{"public_or_private":"PRIVATE"}'\&select=all\&group=district)
echo $RET | cut -c -1000
-sleep $WAIT_TIME
-
echo -e "\nStore calculation named small_schools with formula acreage<10"
RET=$(curl -#X POST -d "name=small_schools&formula=acreage<10" $HOST/calculations/$ID)
echo $RET
@@ -70,7 +63,7 @@ sleep $WAIT_TIME
echo -e "\nStore calculation named male_female_teacher_ratio with formula:"
echo -e "(tsc_male_teachers+local_authority_male_teachers+pta_board_of_governors_male_teacher+other_male_teachers)/(tsc_female_teachers+local_authority_female_teachers+pta_board_of_governors_female_teacher+other_female_teachers)"
echo -e "plus signs must by URI encoded for curl to process them correctly."
-RET=$(curl -#X POST -d "name=male_female_teacher_ratio&formula=(tsc_male_teachers%2Blocal_authority_male_teachers%2Bpta_board_of_governors_male_teacher%2Bother_male_teachers)/(tsc_female_teachers%2Blocal_authority_female_teachers%2Bpta_board_of_governors_female_teacher%2Bother_female_teachers)'" $HOST/calculations/$ID)
+RET=$(curl -#X POST -d "name=male_female_teacher_ratio&formula=(tsc_male_teachers%2Blocal_authority_male_teachers%2Bpta_board_of_governors_male_teacher%2Bother_male_teachers)/(tsc_female_teachers%2Blocal_authority_female_teachers%2Bpta_board_of_governors_female_teacher%2Bother_female_teachers)" $HOST/calculations/$ID)
echo $RET
sleep $WAIT_TIME
View
25 scripts/db/migrate_to_encoded.py
@@ -0,0 +1,25 @@
+import argparse
+
+from pybamboo.connection import Connection
+from pybamboo.dataset import Dataset
+
+
+DEV_BAMBOO_URL = 'http://dev.bamboo.io/'
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-d', '--dataset', help='The dataset ID to migrate')
+ args = parser.parse_args()
+
+
+def main():
+ dataset_url = "http://bamboo.io/datasets/%s.csv" % args.dataset
+
+ dev_connect = Connection(url=DEV_BAMBOO_URL)
+
+ dataset = Dataset(url=dataset_url, connection=dev_connect)
+ print dataset.id
+
+
+main()
View
28 scripts/db/mongo_index.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+
+import os
+import sys
+sys.path.append(os.getcwd())
+
+from pymongo import ASCENDING
+
+from bamboo.config.db import Database
+from bamboo.core.frame import DATASET_ID
+from bamboo.models.observation import Observation
+
+
+def ensure_indexing():
+ """Ensure that bamboo models are indexed."""
+ db = Database.db()
+ calculations = db.calculations
+ datasets = db.datasets
+ observations = db.observations
+ datasets.ensure_index([(DATASET_ID, ASCENDING)])
+ # The encoded dataset_id will be set to '0'.
+ observations.ensure_index([("0", ASCENDING)])
+ observations.ensure_index([(Observation.ENCODING_DATASET_ID, ASCENDING)])
+ calculations.ensure_index([(DATASET_ID, ASCENDING)])
+
+
+if __name__ == "__main__":
+ ensure_indexing()
Something went wrong with that request. Please try again.