Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

cleanup aggregation methods and aggregator eval

  • Loading branch information...
commit 24b0532a6aef0760635011c3fcfb4dfe8059cdd1 1 parent 2a1ce03
@pld pld authored
View
121 bamboo/core/aggregations.py
@@ -6,51 +6,61 @@ class Aggregation(object):
Abstract class for all aggregations.
"""
- def group(self, dframe, groups, columns):
+ def __init__(self, name, groups, dframe):
+ self.name = name
+ self.groups = groups
+ self.dframe = dframe
+
+ def _eval(self, columns):
+ self.columns = columns
+ self.column = columns[0]
+ return self.group() if self.groups else self.agg()
+
+ def group(self):
"""
For when aggregation is called with a group parameter.
"""
- column = columns[0]
- groupby = dframe[groups].join(
- column).groupby(groups, as_index=False)
- return groupby.agg(self.name)
+ groupby = self.dframe[self.groups].join(
+ self.column).groupby(self.groups, as_index=False)
+ return groupby.agg(self.formula_name)
- def column(self, columns, name):
+ def agg(self):
"""
For when aggregation is called without a group parameter.
"""
- column = columns[0]
- result = float(column.__getattribute__(self.name)())
- return DataFrame({name: Series([result])})
+ result = float(self.column.__getattribute__(self.formula_name)())
+ return DataFrame({self.name: Series([result])})
class MultiColumnAggregation(Aggregation):
"""
Interface for aggregations that create multiple columns.
"""
- def _reduce(self, dframe, columns, name):
- new_dframe = self.column(columns, name)
+ def _reduce(self, dframe, columns):
+ self.columns = columns
+ self.column = columns[0]
+ new_dframe = self.agg()
for column in new_dframe.columns:
dframe[column] += new_dframe[column]
- dframe[name] = self._agg_dframe(dframe, name)
+ dframe[self.name] = self._agg_dframe(dframe)
return dframe
- def _name_for_idx(self, name, idx):
- return '%s_%s' % (name, {
+ def _name_for_idx(self, idx):
+ return '%s_%s' % (self.name, {
0: 'numerator',
1: 'denominator',
}[idx])
- def _build_dframe(self, dframe, name, columns):
+ def _build_dframe(self, dframe, columns):
for idx, column in enumerate(columns):
- column.name = self._name_for_idx(name, idx)
+ column.name = self._name_for_idx(idx)
dframe = dframe.join(column)
return dframe
- def _agg_dframe(self, dframe, name):
- return dframe[self._name_for_idx(name, 0)].apply(float) /\
- dframe[self._name_for_idx(name, 1)]
+ def _agg_dframe(self, dframe):
+ return dframe[self._name_for_idx(0)].apply(float) /\
+ dframe[self._name_for_idx(1)]
class MaxAggregation(Aggregation):
@@ -58,7 +68,7 @@ class MaxAggregation(Aggregation):
Calculate the maximum.
"""
- name = 'max'
+ formula_name = 'max'
class MeanAggregation(MultiColumnAggregation):
@@ -66,32 +76,30 @@ class MeanAggregation(MultiColumnAggregation):
Calculate the arithmetic mean.
"""
- name = 'mean'
+ formula_name = 'mean'
- def column(self, columns, name):
- column = columns[0]
+ def agg(self):
dframe = DataFrame(index=[0])
- columns = [Series([col]) for col in [column.sum(), len(column)]]
- dframe = self._build_dframe(dframe, name, columns)
+ columns = [
+ Series([col]) for col in [self.column.sum(), len(self.column)]]
+ dframe = self._build_dframe(dframe, columns)
dframe = DataFrame([dframe.sum().to_dict()])
- column = self._agg_dframe(dframe, name)
- column.name = name
+ column = self._agg_dframe(dframe, self.name)
+ column.name = self.name
return dframe.join(column)
- def group(self, dframe, groups, columns):
- column = columns[0]
- name = column.name
- dframe = dframe[groups]
+ def group(self):
+ dframe = self.dframe[self.groups]
dframe = self._build_dframe(
- dframe, name, [column, Series([1] * len(column))])
- groupby = dframe.groupby(groups, as_index=False)
+ dframe, [self.column, Series([1] * len(self.column))])
+ groupby = dframe.groupby(self.groups, as_index=False)
aggregated_dframe = groupby.sum()
- new_column = self._agg_dframe(aggregated_dframe, name)
- new_column.name = name
+ new_column = self._agg_dframe(aggregated_dframe)
+ new_column.name = self.name
dframe = aggregated_dframe.join(new_column)
@@ -103,7 +111,7 @@ class MedianAggregation(Aggregation):
Calculate the median.
"""
- name = 'median'
+ formula_name = 'median'
class MinAggregation(Aggregation):
@@ -111,7 +119,7 @@ class MinAggregation(Aggregation):
Calculate the minimum.
"""
- name = 'min'
+ formula_name = 'min'
class SumAggregation(Aggregation):
@@ -119,10 +127,12 @@ class SumAggregation(Aggregation):
Calculate the sum.
"""
- name = 'sum'
+ formula_name = 'sum'
- def _reduce(self, dframe, columns, name):
- dframe[name] += self.column(columns, name)[name]
+ def _reduce(self, dframe, columns):
+ self.columns = columns
+ self.column = columns[0]
+ dframe[self.name] += self.agg()[self.name]
return dframe
@@ -133,39 +143,38 @@ class RatioAggregation(MultiColumnAggregation):
denominator columns.
"""
- name = 'ratio'
+ formula_name = 'ratio'
- def group(self, dframe, groups, columns):
+ def group(self):
# name of formula
- name = columns[0].name
- dframe = dframe[groups]
+ dframe = self.dframe[self.groups]
- dframe = self._build_dframe(dframe, name, columns)
+ dframe = self._build_dframe(dframe, self.columns)
- groupby = dframe.groupby(groups, as_index=False)
+ groupby = dframe.groupby(self.groups, as_index=False)
aggregated_dframe = groupby.sum()
- new_column = self._agg_dframe(aggregated_dframe, name)
+ new_column = self._agg_dframe(aggregated_dframe)
- new_column.name = name
+ new_column.name = self.name
dframe = aggregated_dframe.join(new_column)
return dframe
- def column(self, columns, name):
- dframe = DataFrame(index=columns[0].index)
+ def agg(self):
+ dframe = DataFrame(index=self.column.index)
- dframe = self._build_dframe(dframe, name, columns)
- column_names = [self._name_for_idx(name, i) for i in xrange(0, 2)]
+ dframe = self._build_dframe(dframe, self.columns)
+ column_names = [self._name_for_idx(i) for i in xrange(0, 2)]
dframe = dframe.dropna(subset=column_names)
dframe = DataFrame([dframe.sum().to_dict()])
- column = self._agg_dframe(dframe, name)
- column.name = name
+ column = self._agg_dframe(dframe)
+ column.name = self.name
return dframe.join(column)
AGGREGATIONS = dict([
- (cls.name, cls()) for cls in
+ (cls.formula_name, cls) for cls in
Aggregation.__subclasses__() + MultiColumnAggregation.__subclasses__()
- if hasattr(cls, 'name')])
+ if hasattr(cls, 'formula_name')])
View
31 bamboo/core/aggregator.py
@@ -15,19 +15,20 @@ class Aggregator(object):
dataset. Otherwise create a new linked dataset.
"""
- def __init__(self, dataset, dframe, columns, group_str, _type, name):
+ def __init__(self, dataset, dframe, group_str, _type, name):
self.dataset = dataset
self.dframe = dframe
- self.columns = columns
# MongoDB does not allow None as a key
self.group_str = group_str if group_str else ''
self.groups = split_groups(self.group_str) if group_str else None
self.name = name
- self.aggregation = AGGREGATIONS.get(_type)
+ self.aggregation = AGGREGATIONS.get(_type)(
+ self.name, self.groups, self.dframe)
- def save(self):
- new_dframe = BambooFrame(self.eval_dframe()).add_parent_column(
- self.dataset.dataset_id)
+ def save(self, columns):
+ new_dframe = BambooFrame(
+ self.aggregation._eval(columns)
+ ).add_parent_column(self.dataset.dataset_id)
aggregated_datasets = self.dataset.aggregated_datasets
agg_dataset = aggregated_datasets.get(self.group_str, None)
@@ -60,7 +61,7 @@ def save(self):
agg_dataset.replace_observations(new_dframe)
self.new_dframe = new_dframe
- def update(self, child_dataset, parser, formula):
+ def update(self, child_dataset, parser, formula, columns):
"""
Attempt to reduce an update and store.
"""
@@ -74,22 +75,14 @@ def update(self, child_dataset, parser, formula):
child_dataset.remove_parent_observations(parent_dataset_id)
if not self.groups and '_reduce' in dir(self.aggregation):
- dframe = BambooFrame(self.aggregation._reduce(
- dframe, self.columns, self.name))
+ dframe = BambooFrame(
+ self.aggregation._reduce(dframe, columns))
else:
- _, self.columns = parser._make_columns(
+ _, columns = parser._make_columns(
formula, self.name, self.dframe)
- new_dframe = self.eval_dframe()
+ new_dframe = self.aggregation._eval(columns)
dframe[self.name] = new_dframe[self.name]
new_agg_dframe = concat([child_dataset.dframe(), dframe])
return child_dataset.replace_observations(
new_agg_dframe.add_parent_column(parent_dataset_id))
-
- def eval_dframe(self):
- if self.group_str:
- # groupby on dframe then run aggregation on groupby obj
- return self.aggregation.group(self.dframe, self.groups,
- self.columns)
- else:
- return self.aggregation.column(self.columns, self.name)
View
8 bamboo/core/calculator.py
@@ -56,9 +56,9 @@ def calculate_column(self, formula, name, group_str=None):
aggregation, new_columns = self._make_columns(formula, name)
if aggregation:
- agg = Aggregator(self.dataset, self.dframe, new_columns,
+ agg = Aggregator(self.dataset, self.dframe,
group_str, aggregation, name)
- agg.save()
+ agg.save(new_columns)
else:
self.dataset.replace_observations(self.dframe.join(new_columns[0]))
@@ -225,9 +225,9 @@ def _update_aggregate_dataset(self, formula, new_dframe, name, group,
aggregation, new_columns = self._make_columns(
formula, name, new_dframe)
- agg = Aggregator(self.dataset, self.dframe, new_columns,
+ agg = Aggregator(self.dataset, self.dframe,
group, aggregation, name)
- new_agg_dframe = agg.update(agg_dataset, self, formula)
+ new_agg_dframe = agg.update(agg_dataset, self, formula, new_columns)
# jsondict from new dframe
new_data = new_agg_dframe.to_jsondict()
View
4 celeryd/celeryd
@@ -20,8 +20,8 @@
#set -e
-DEFAULT_PID_FILE="celeryd/celeryd@%n.pid"
-DEFAULT_LOG_FILE="celeryd/celeryd@%n.log"
+DEFAULT_PID_FILE="celeryd/celeryd.pid"
+DEFAULT_LOG_FILE="celeryd/celeryd.log"
DEFAULT_LOG_LEVEL="INFO"
DEFAULT_NODES="celery"
DEFAULT_CELERYD="-m celery.bin.celeryd_detach"
Please sign in to comment.
Something went wrong with that request. Please try again.