Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Stats Rebuild Aggregates shouldn't run more than once at the same time

  • Loading branch information...
commit eec3cd29e0008c0f5d66804aeff3b30106a53acb 1 parent 7badee0
@flavour authored
View
2  VERSION
@@ -1 +1 @@
-f21ad7b (2012-10-10 14:56:53)
+7badee0 (2012-10-11 00:10:37)
View
61 models/tasks.py
@@ -19,9 +19,8 @@ def gis_download_kml(record_id, filename, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
- result = gis.download_kml(record_id, filename)
- return result
+ # Run the Task & return the result
+ return gis.download_kml(record_id, filename)
tasks["gis_download_kml"] = gis_download_kml
@@ -37,10 +36,9 @@ def gis_update_location_tree(feature, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
+ # Run the Task & return the result
feature = json.loads(feature)
- result = gis.update_location_tree(feature)
- return result
+ return gis.update_location_tree(feature)
tasks["gis_update_location_tree"] = gis_update_location_tree
@@ -121,9 +119,8 @@ def msg_process_outbox(contact_method, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
- result = msg.process_outbox(contact_method)
- return result
+ # Run the Task & return the result
+ return msg.process_outbox(contact_method)
tasks["msg_process_outbox"] = msg_process_outbox
@@ -135,9 +132,8 @@ def msg_process_inbound_email(username, user_id):
@param username: email address of the email source to read from.
This uniquely identifies one inbound email task.
"""
- # Run the Task
- result = msg.fetch_inbound_email(username)
- return result
+ # Run the Task & return the result
+ return msg.fetch_inbound_email(username)
tasks["msg_process_inbound_email"] = msg_process_inbound_email
@@ -149,9 +145,8 @@ def msg_twilio_inbound_sms(account, user_id):
@param account: account name for the SMS source to read from.
This uniquely identifies one inbound SMS task.
"""
- # Run the Task
- result = msg.twilio_inbound_sms(account)
- return result
+ # Run the Task & return the result
+ return msg.twilio_inbound_sms(account)
tasks["msg_twilio_inbound_sms"] = msg_twilio_inbound_sms
@@ -160,15 +155,18 @@ def msg_parse_workflow(workflow, source, user_id):
"""
Processes the msg_log for unparsed messages.
"""
- # Run the Task
- result = msg.parse_import(workflow, source)
- return result
+ # Run the Task & return the result
+ return msg.parse_import(workflow, source)
tasks["msg_parse_workflow"] = msg_parse_workflow
# --------------------------------------------------------------------------
def msg_search_subscription_notifications(frequency):
- return eden.msg.search_subscription_notifications(frequency=frequency)
+ """
+ Search Subscriptions & send Notifications.
+ """
+ # Run the Task & return the result
+ return s3db.msg_search_subscription_notifications(frequency=frequency)
tasks["msg_search_subscription_notifications"] = msg_search_subscription_notifications
@@ -183,9 +181,8 @@ def stats_group_clean(user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
- result = s3db.stats_group_clean()
- return result
+ # Run the Task & return the result
+ return s3db.stats_group_clean()
tasks["stats_group_clean"] = stats_group_clean
@@ -199,9 +196,8 @@ def stats_update_time_aggregate(data_id=None, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
- result = s3db.stats_update_time_aggregate(data_id)
- return result
+ # Run the Task & return the result
+ return s3db.stats_update_time_aggregate(data_id)
tasks["stats_update_time_aggregate"] = stats_update_time_aggregate
@@ -224,14 +220,13 @@ def stats_update_aggregate_location(location_level,
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
- # Run the Task
- result = s3db.stats_update_aggregate_location(location_level,
- root_location_id,
- parameter_id,
- start_date,
- end_date,
- )
- return result
+ # Run the Task & return the result
+ return s3db.stats_update_aggregate_location(location_level,
+ root_location_id,
+ parameter_id,
+ start_date,
+ end_date,
+ )
tasks["stats_update_aggregate_location"] = stats_update_aggregate_location
View
3  modules/eden/msg.py
@@ -35,6 +35,7 @@
"S3TwitterModel",
"S3XFormsModel",
"S3ParsingModel",
+ "msg_search_subscription_notifications",
]
from gluon import *
@@ -814,7 +815,7 @@ def source_represent(id, show_link=True):
return repr
# =============================================================================
-def search_subscription_notifications(frequency):
+def msg_search_subscription_notifications(frequency):
"""
Send Notifications for all Subscriptions
"""
View
149 modules/eden/stats.py
@@ -241,11 +241,34 @@ def stats_rebuild_aggregates():
- should be reworked to delete old data after new data has been added?
"""
+ # Check to see whether an existing task is running and if it is then kill it
+ db = current.db
+ ttable = db.scheduler_task
+ rtable = db.scheduler_run
+ wtable = db.scheduler_worker
+ query = (ttable.task_name == "stats_group_clean") & \
+ (rtable.scheduler_task == ttable.id) & \
+ (rtable.status == "RUNNING")
+ rows = db(query).select(rtable.id,
+ rtable.scheduler_task,
+ rtable.worker_name)
+ now = current.request.utcnow
+ for row in rows:
+ db(wtable.worker_name == row.worker_name).update(status="KILL")
+ db(rtable.id == row.id).update(stop_time=now,
+ status="STOPPED")
+ db(ttable.id == row.scheduler_task).update(stop_time=now,
+ status="STOPPED")
+
+ # Mark all stats_group records as needing to be updated
s3db = current.s3db
+ db(s3db.stats_group.deleted != True).update(dirty=True)
+
+ # Delete the existing data
resource = s3db.resource("stats_aggregate")
resource.delete()
- current.db(s3db.stats_group.id > 0).update(dirty=True)
+ # Fire off a rebuild task
current.s3task.async("stats_group_clean")
# ---------------------------------------------------------------------
@@ -285,11 +308,21 @@ def stats_update_time_aggregate(cls, data_id=None):
if not data_id:
query = (dtable.deleted != True) & \
(dtable.approved_by != None)
- records = db(query).select()
+ records = db(query).select(dtable.location_id,
+ dtable.parameter_id,
+ dtable.data_id,
+ dtable.date,
+ dtable.value,
+ )
elif isinstance(data_id, Rows):
records = data_id
elif not isinstance(data_id, Row):
- records = db(dtable.data_id == data_id).select(limitby=(0, 1))
+ records = db(dtable.data_id == data_id).select(dtable.location_id,
+ dtable.parameter_id,
+ dtable.data_id,
+ dtable.date,
+ dtable.value,
+ limitby=(0, 1))
else:
records = [data_id]
data_id = data_id.data_id
@@ -490,7 +523,9 @@ def stats_update_time_aggregate(cls, data_id=None):
if changed_periods == []:
continue
# The following structures are used in the OPTIMISATION steps later
- loc_level_list[location_id] = gis_table[location_id].level
+ loc_level_list[location_id] = db(gis_table.id == location_id).select(gis_table.level,
+ limitby=(0, 1)
+ ).first().level
if parameter_id not in param_location_dict:
param_location_dict[parameter_id] = {location_id : changed_periods}
elif location_id not in param_location_dict[parameter_id]:
@@ -583,7 +618,7 @@ def stats_update_time_aggregate(cls, data_id=None):
# Now calculate the resilence indicators
vulnerability_resilience = s3db.vulnerability_resilience
resilience_pid = s3db.vulnerability_resilience_id()
- for (location_id, (period, loc_level,use_location)) in resilence_parents.items():
+ for (location_id, (period, loc_level, use_location)) in resilence_parents.items():
for (start_date, end_date) in changed_periods:
s, e = str(start_date), str(end_date)
vulnerability_resilience(loc_level,
@@ -725,8 +760,9 @@ def stats_aggregated_period(data_date = None):
if data_date is None:
data_date = date.today()
- soap = date(data_date.year, 1, 1)
- eoap = date(data_date.year, 12, 31)
+ year = data_date.year
+ soap = date(year, 1, 1)
+ eoap = date(year, 12, 31)
return (soap, eoap)
# =============================================================================
@@ -877,6 +913,9 @@ class S3StatsGroupModel(S3Model):
def model(self):
T = current.T
+ db = current.db
+ configure = self.configure
+ define_table = self.define_table
# ---------------------------------------------------------------------
# Document-source entities
@@ -898,7 +937,7 @@ def model(self):
# Reusable Field
source_id = S3ReusableField("source_id", table,
requires = IS_NULL_OR(
- IS_ONE_OF(current.db,
+ IS_ONE_OF(db,
"stats_source.source_id",
stats_source_represent)),
represent = stats_source_represent,
@@ -942,64 +981,64 @@ def model(self):
# Components
self.add_component("stats_group", stats_source=self.super_key(table))
- self.configure("stats_source",
- deduplicate = self.stats_source_duplicate,
- )
+ configure("stats_source",
+ deduplicate = self.stats_source_duplicate,
+ )
# ---------------------------------------------------------------------
# The type of document held as a stats_group.
#
tablename = "stats_group_type"
- table = self.define_table(tablename,
- Field("stats_group_instance",
- label=T("Instance Type")),
- Field("name",
- label=T("Name")),
- Field("display",
- label=T("Display")),
- *s3_meta_fields()
- )
+ table = define_table(tablename,
+ Field("stats_group_instance",
+ label=T("Instance Type")),
+ Field("name",
+ label=T("Name")),
+ Field("display",
+ label=T("Display")),
+ *s3_meta_fields()
+ )
# Reusable Field
group_type_id = S3ReusableField("group_type_id", table,
- requires = IS_NULL_OR(
- IS_ONE_OF(current.db,
- "stats_group_type.id",
- stats_group_type_represent)),
- represent = stats_group_type_represent,
- label = T("Source Type"),
- ondelete = "CASCADE")
+ requires = IS_NULL_OR(
+ IS_ONE_OF(db,
+ "stats_group_type.id",
+ stats_group_type_represent)),
+ represent = stats_group_type_represent,
+ label = T("Source Type"),
+ ondelete = "CASCADE")
# Resource Configuration
- self.configure("stats_group_type",
- deduplicate=self.stats_group_type_duplicate,
- )
+ configure("stats_group_type",
+ deduplicate=self.stats_group_type_duplicate,
+ )
# ---------------------------------------------------------------------
# Container for documents and stats records
#
tablename = "stats_group"
- table = self.define_table(tablename,
- # This is a component, so needs to be a super_link
- # - can't override field name, ondelete or requires
- self.super_link("source_id", "stats_source"),
- s3_date(label = T("Date Published")),
- self.gis_location_id(),
- group_type_id(),
- # Used to indicate if the record has not yet
- # been used in aggregate calculations
- Field("dirty", "boolean",
- #label = T("Dirty"),
- default=True,
- readable=False,
- writable=False),
- #Field("reliability",
- # label=T("Reliability")),
- #Field("review",
- # label=T("Review")),
- *s3_meta_fields()
- )
+ table = define_table(tablename,
+ # This is a component, so needs to be a super_link
+ # - can't override field name, ondelete or requires
+ self.super_link("source_id", "stats_source"),
+ s3_date(label = T("Date Published")),
+ self.gis_location_id(),
+ group_type_id(),
+ # Used to indicate if the record has not yet
+ # been used in aggregate calculations
+ Field("dirty", "boolean",
+ #label = T("Dirty"),
+ default=True,
+ readable=False,
+ writable=False),
+ #Field("reliability",
+ # label=T("Reliability")),
+ #Field("review",
+ # label=T("Review")),
+ *s3_meta_fields()
+ )
# Reusable Field
group_id = S3ReusableField("group_id", table,
- requires = IS_ONE_OF(current.db,
+ requires = IS_ONE_OF(db,
"stats_group.id",
stats_group_represent),
represent = stats_group_represent,
@@ -1008,10 +1047,10 @@ def model(self):
table.virtualfields.append(StatsGroupVirtualFields())
# Resource Configuration
- self.configure("stats_group",
- deduplicate=self.stats_group_duplicate,
- requires_approval = True,
- )
+ configure("stats_group",
+ deduplicate=self.stats_group_duplicate,
+ requires_approval = True,
+ )
# ---------------------------------------------------------------------
# Pass model-global names to response.s3
Please sign in to comment.
Something went wrong with that request. Please try again.