Skip to content
This repository has been archived by the owner on Feb 13, 2020. It is now read-only.

Commit

Permalink
Merge better-next-job branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrusdaboo committed Jul 13, 2016
2 parents ca69273 + c4498a2 commit a9ba6d6
Show file tree
Hide file tree
Showing 17 changed files with 5,422 additions and 62 deletions.
27 changes: 14 additions & 13 deletions calendarserver/tools/dashtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats_system"]["cpu use"] for onehost in hosts])
return sum([stats[onehost]["stats_system"]["cpu use"] if stats[onehost] else 0 for onehost in hosts])



Expand All @@ -179,7 +179,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats_system"]["memory percent"] for onehost in hosts])
return sum([stats[onehost]["stats_system"]["memory percent"] if stats[onehost] else 0 for onehost in hosts])



Expand All @@ -203,7 +203,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats"]["1m"]["requests"] for onehost in hosts]) / 60.0
return sum([stats[onehost]["stats"]["1m"]["requests"] if stats[onehost] else 0 for onehost in hosts]) / 60.0



Expand All @@ -227,8 +227,8 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
tsum = sum([stats[onehost]["stats"]["1m"]["t"] for onehost in hosts])
rsum = sum([stats[onehost]["stats"]["1m"]["requests"] for onehost in hosts])
tsum = sum([stats[onehost]["stats"]["1m"]["t"] if stats[onehost] else 0 for onehost in hosts])
rsum = sum([stats[onehost]["stats"]["1m"]["requests"] if stats[onehost] else 0 for onehost in hosts])
return safeDivision(tsum, rsum)


Expand All @@ -253,7 +253,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats"]["1m"]["500"] for onehost in hosts]) / 60.0
return sum([stats[onehost]["stats"]["1m"]["500"] if stats[onehost] else 0 for onehost in hosts]) / 60.0



Expand All @@ -277,7 +277,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats"]["1m"]["401"] for onehost in hosts]) / 60.0
return sum([stats[onehost]["stats"]["1m"]["401"] if stats[onehost] else 0 for onehost in hosts]) / 60.0



Expand All @@ -301,7 +301,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats"]["1m"]["max-slots"] for onehost in hosts])
return sum([stats[onehost]["stats"]["1m"]["max-slots"] if stats[onehost] else 0 for onehost in hosts])



Expand All @@ -328,7 +328,7 @@ def maxY(numHosts):
def calculate(stats, item, hosts):
result = 0
for onehost in hosts:
completed = sum(map(operator.itemgetter(2), stats[onehost]["job_assignments"]["workers"]))
completed = sum(map(operator.itemgetter(2), stats[onehost]["job_assignments"]["workers"])) if stats[onehost] else 0
delta = completed - JobsCompletedDataType.lastCompleted[onehost] if JobsCompletedDataType.lastCompleted[onehost] else 0
if delta >= 0:
result += delta
Expand Down Expand Up @@ -358,7 +358,7 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
return sum([stats[onehost]["stats"]["1m"]["method"].get(item, 0) for onehost in hosts])
return sum([stats[onehost]["stats"]["1m"]["method"].get(item, 0) if stats[onehost] else 0 for onehost in hosts])



Expand All @@ -383,8 +383,8 @@ def maxY(numHosts):

@staticmethod
def calculate(stats, item, hosts):
tsum = sum([stats[onehost]["stats"]["1m"]["method-t"].get(item, 0) for onehost in hosts])
rsum = sum([stats[onehost]["stats"]["1m"]["method"].get(item, 0) for onehost in hosts])
tsum = sum([stats[onehost]["stats"]["1m"]["method-t"].get(item, 0) if stats[onehost] else 0 for onehost in hosts])
rsum = sum([stats[onehost]["stats"]["1m"]["method"].get(item, 0) if stats[onehost] else 0 for onehost in hosts])
return safeDivision(tsum, rsum)


Expand All @@ -411,7 +411,8 @@ def maxY(numHosts):
def calculate(stats, item, hosts):
# Job queue stat only read for first host
onehost = sorted(stats.keys())[0]

if len(stats[onehost]) == 0:
return 0
if item:
return sum(map(operator.itemgetter("queued"), {k: v for k, v in stats[onehost]["jobs"].items() if k.startswith(item)}.values()))
else:
Expand Down
7 changes: 7 additions & 0 deletions conf/caldavd-stdconfig.plist
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@
<key>mediumPriorityLevel</key>
<integer>50</integer>

<!-- This is used to help with concurrency problems when the underlying DB
does not support a proper "LIMIT" term with the query (Oracle). It should
be set to no more than 1 plus the number of app-servers in use. For a
single app-server, always use 1. -->
<key>rowLimit</key>
<integer>1</integer>

<!-- When a job fails, reschedule it this number of seconds in the future -->
<key>failureRescheduleInterval</key>
<integer>60</integer>
Expand Down
2 changes: 1 addition & 1 deletion requirements-cs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
zope.interface==4.1.3
setuptools==18.5

--editable svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@15744#egg=twextpy
--editable svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@15762#egg=twextpy
cffi==1.7.0
pycparser==2.14
#twisted
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def version():
}

if "ORACLE_HOME" in os.environ:
# We need a specific version here because we have to patch it
extras_requirements["Oracle"] = ["twextpy[Oracle]", "cx_Oracle==5.2"]


Expand Down
6 changes: 6 additions & 0 deletions twistedcaldav/stdconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@
"highPriorityLevel": 80, # Queue capacity (percentage) at which only high priority items are run
"mediumPriorityLevel": 50, # Queue capacity (percentage) at which only high/medium priority items are run

"rowLimit": 1, # This is used to help with concurrency problems when the underlying DB does
# not support a proper "LIMIT" term with the query (Oracle). It should be set to no more than
# 1 plus the number of app-servers in use. For a single app-server, always use 1.


"failureRescheduleInterval": 60, # When a job fails, reschedule it this number of seconds in the future
"lockRescheduleInterval": 60, # When a job can't run because of a lock, reschedule it this number of seconds in the future

Expand Down Expand Up @@ -1377,6 +1382,7 @@ def _updateWorkQueue(configDict, reloading=False):
"overloadLevel",
"highPriorityLevel",
"mediumPriorityLevel",
"rowLimit",
):
setattr(ControllerQueue, attr, getattr(config.WorkQueue, attr))

Expand Down
28 changes: 25 additions & 3 deletions txdav/base/datastore/dbapiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from txdav.common.icommondatastore import InternalDataStoreError

import datetime

import pg8000 as postgres
import six

Expand Down Expand Up @@ -125,7 +127,7 @@ def var(self, *args):
return self.realCursor.var(*args)


def execute(self, sql, args=()):
def mapArgs(self, args):
realArgs = []
for arg in args:
if isinstance(arg, str):
Expand All @@ -134,6 +136,7 @@ def execute(self, sql, args=()):
# application layer as they consume less memory, so do the
# conversion here.
arg = arg.decode('utf-8')

if isinstance(arg, unicode) and len(arg) > 1024:
# This *may* cause a type mismatch, but none of the non-CLOB
# strings that we're passing would allow a value this large
Expand All @@ -144,18 +147,37 @@ def execute(self, sql, args=()):
# it is:
v = self.var(cx_Oracle.NCLOB, len(arg) + 1)
v.setvalue(0, arg)

elif isinstance(arg, datetime.datetime):
# By default when cx_Oracle is passed a datetime object it maps it to a
# cx_Oracle.DATETIME variable which does not serialize fraction seconds
# into the query, or call, arguments. However, for high volume systems,
# we really want sub-second resolution for things like the job queue,
# so we want to serialize datetime as cx_Oracle.TIMESTAMP.
v = self.var(cx_Oracle.TIMESTAMP)
v.setvalue(0, arg)

else:
v = arg

realArgs.append(v)

return realArgs


def execute(self, sql, args=()):
realArgs = self.mapArgs(args)
return super(OracleCursorWrapper, self).execute(sql, realArgs)


def callproc(self, name, args=()):
return self.realCursor.callproc(name, args)
realArgs = self.mapArgs(args)
return self.realCursor.callproc(name, realArgs)


def callfunc(self, name, returnType, args=()):
return self.realCursor.callfunc(name, returnType, args)
realArgs = self.mapArgs(args)
return self.realCursor.callfunc(name, returnType, realArgs)



Expand Down
20 changes: 10 additions & 10 deletions txdav/common/datastore/sql_schema/current-oracle-dialect-extras.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

-- Extra schema to add to current-oracle-dialect.sql

create or replace function next_job(now in timestamp, min_priority in integer)
create or replace function next_job(now in timestamp, min_priority in integer, row_limit in integer)
return integer is
cursor c (priority number) is
cursor c (test_priority number) is
select JOB_ID from JOB
where PRIORITY = priority AND ASSIGNED is NULL and PAUSE = 0 and NOT_BEFORE <= now
where PRIORITY = test_priority and IS_ASSIGNED = 0 and PAUSE = 0 and NOT_BEFORE <= now and ROWNUM <= row_limit
for update skip locked;
result integer;
begin
Expand All @@ -41,17 +41,17 @@ begin
end;
/

create or replace function overdue_job(now timestamp)
create or replace function overdue_job(now in timestamp, row_limit in integer)
return integer is
cursor c1 is
cursor c is
select JOB_ID from JOB
where ASSIGNED is not NULL and OVERDUE <= now
for update skip locked;
where IS_ASSIGNED = 1 and OVERDUE <= now and ROWNUM <= row_limit
for update skip locked;
result integer;
begin
open c1;
fetch c1 into result;
close c1;
open c;
fetch c into result;
close c;
return result;
end;
/
Loading

0 comments on commit a9ba6d6

Please sign in to comment.