From 35c38da7125097985cd467d750827bcaef9291f8 Mon Sep 17 00:00:00 2001 From: Wilfredo Sanchez Date: Mon, 3 Mar 2014 23:47:56 +0000 Subject: [PATCH] lint stop using % formatting git-svn-id: https://svn.calendarserver.org/repository/calendarserver/CalendarServer/trunk@12787 e27351fd-9f3e-4f54-a53b-843176b1656c --- calendarserver/tap/caldav.py | 680 +++++++++++++++++++++----------- calendarserver/webadmin/work.py | 117 +++--- 2 files changed, 506 insertions(+), 291 deletions(-) diff --git a/calendarserver/tap/caldav.py b/calendarserver/tap/caldav.py index 04afa07bd..399fa6e97 100644 --- a/calendarserver/tap/caldav.py +++ b/calendarserver/tap/caldav.py @@ -40,7 +40,9 @@ from twisted.application.internet import TCPServer, UNIXServer from twisted.application.service import MultiService, IServiceMaker from twisted.application.service import Service -from twisted.internet.defer import gatherResults, Deferred, inlineCallbacks, succeed +from twisted.internet.defer import ( + gatherResults, Deferred, inlineCallbacks, succeed +) from twisted.internet.endpoints import UNIXClientEndpoint, TCP4ClientEndpoint from twisted.internet.process import ProcessExitedAlready from twisted.internet.protocol import ProcessProtocol @@ -58,13 +60,16 @@ from twext.enterprise.jobqueue import NonPerformingQueuer from twext.enterprise.jobqueue import PeerConnectionPool from twext.enterprise.jobqueue import WorkerFactory as QueueWorkerFactory -from twext.internet.fswatch import DirectoryChangeListener, IDirectoryChangeListenee +from twext.internet.fswatch import ( + DirectoryChangeListener, IDirectoryChangeListenee +) from twext.internet.ssl import ChainingOpenSSLContextFactory from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer from twext.python.filepath import CachingFilePath from twext.python.log import Logger, LogLevel, replaceTwistedLoggers -from txweb2.channel.http import LimitingHTTPFactory, SSLRedirectRequest, \ - HTTPChannel +from txweb2.channel.http import ( + LimitingHTTPFactory, SSLRedirectRequest, HTTPChannel +) from txweb2.metafd import ConnectionLimiter, ReportingHTTPService from txweb2.server import Site @@ -74,9 +79,13 @@ from txdav.common.datastore.upgrade.sql.upgrade import ( UpgradeDatabaseCalendarDataStep, UpgradeDatabaseOtherStep, UpgradeDatabaseSchemaStep, UpgradeDatabaseAddressBookDataStep, - UpgradeAcquireLockStep, UpgradeReleaseLockStep, UpgradeDatabaseNotificationDataStep) + UpgradeAcquireLockStep, UpgradeReleaseLockStep, + UpgradeDatabaseNotificationDataStep +) from txdav.common.datastore.work.inbox_cleanup import scheduleFirstInboxCleanup -from txdav.common.datastore.work.revision_cleanup import scheduleFirstFindMinRevision +from txdav.common.datastore.work.revision_cleanup import ( + scheduleFirstFindMinRevision +) from txdav.dps.server import DirectoryProxyServiceMaker from txdav.dps.client import DirectoryService as DirectoryProxyClientService from txdav.who.groups import GroupCacher as NewGroupCacher @@ -104,14 +113,13 @@ from calendarserver.push.amppush import AMPPushMaster, AMPPushForwarder from calendarserver.push.applepush import ApplePushNotifierService from calendarserver.push.notifier import PushDistributor -from calendarserver.tap.util import ConnectionDispenser -from calendarserver.tap.util import Stepper -from calendarserver.tap.util import checkDirectories -from calendarserver.tap.util import getRootResource -from calendarserver.tap.util import oracleConnectorFromConfig -from calendarserver.tap.util import pgConnectorFromConfig -from calendarserver.tap.util import pgServiceFromConfig, getDBPool, MemoryLimitService -from calendarserver.tap.util import storeFromConfig +from calendarserver.tap.util import ( + ConnectionDispenser, Stepper, + checkDirectories, getRootResource, + oracleConnectorFromConfig, pgConnectorFromConfig, + pgServiceFromConfig, getDBPool, MemoryLimitService, + storeFromConfig +) try: from calendarserver.version import version @@ -121,10 +129,10 @@ sys.path.insert( 0, getModule(__name__).pathEntry.filePath.child("support").path) from version import version as getVersion - version = "%s (%s*)" % getVersion() + version = "{} ({}*)".format(getVersion()) from txweb2.server import VERSION as TWISTED_VERSION -TWISTED_VERSION = "CalendarServer/%s %s" % ( +TWISTED_VERSION = "CalendarServer/{} {}".format( version.replace(" ", ""), TWISTED_VERSION, ) @@ -212,7 +220,7 @@ class CalDAVStatisticsProtocol (Protocol): def connectionMade(self): stats = self.factory.logger.observer.getStats() - self.transport.write("%s\r\n" % (stats,)) + self.transport.write("{}\r\n".format(stats)) self.transport.loseConnection() @@ -234,10 +242,13 @@ def __init__(self, logEnabled, logPath, logRotateLength, logMaxFiles): """ @param logEnabled: Whether to write to a log file @type logEnabled: C{boolean} + @param logPath: the full path to the log file @type logPath: C{str} + @param logRotateLength: rotate when files exceed this many bytes @type logRotateLength: C{int} + @param logMaxFiles: keep at most this many files @type logMaxFiles: C{int} """ @@ -273,10 +284,12 @@ class CalDAVService (ErrorLoggingMultiService): connectionServiceName = "ConnectionService" def __init__(self, logObserver): - self.logObserver = logObserver # accesslog observer - ErrorLoggingMultiService.__init__(self, config.ErrorLogEnabled, + self.logObserver = logObserver # accesslog observer + ErrorLoggingMultiService.__init__( + self, config.ErrorLogEnabled, config.ErrorLogFile, config.ErrorLogRotateMB * 1024 * 1024, - config.ErrorLogMaxRotatedFiles) + config.ErrorLogMaxRotatedFiles + ) def privilegedStartService(self): @@ -288,7 +301,9 @@ def privilegedStartService(self): def stopService(self): """ Wait for outstanding requests to finish - @return: a Deferred which fires when all outstanding requests are complete + + @return: a Deferred which fires when all outstanding requests are + complete """ connectionService = self.getServiceNamed(self.connectionServiceName) # Note: removeService() also calls stopService() @@ -306,7 +321,7 @@ class CalDAVOptions (Options): "config", "f", DEFAULT_CONFIG_FILE, "Path to configuration file." ]] - zsh_actions = {"config" : "_files -g '*.plist'"} + zsh_actions = {"config": "_files -g '*.plist'"} def __init__(self, *args, **kwargs): super(CalDAVOptions, self).__init__(*args, **kwargs) @@ -382,7 +397,7 @@ def opt_option(self, option): self.overrides ) else: - self.opt_option("%s=True" % (option,)) + self.opt_option("{}=True".format(option)) opt_o = opt_option @@ -392,25 +407,27 @@ def postOptions(self): self.loadConfiguration() self.checkConfiguration() except ConfigurationError, e: - print("Invalid configuration: %s" % (e,)) + print("Invalid configuration:", e) sys.exit(1) def loadConfiguration(self): if not os.path.exists(self["config"]): - raise ConfigurationError("Config file %s not found. Exiting." - % (self["config"],)) + raise ConfigurationError( + "Config file {} not found. Exiting." + .format(self["config"]) + ) - print("Reading configuration from file: %s" % (self["config"],)) + print("Reading configuration from file:", self["config"]) config.load(self["config"]) for path in config.getProvider().importedFiles: - print("Imported configuration from file: '%s'" % (path,)) + print("Imported configuration from file:", path) for path in config.getProvider().includedFiles: - print("Adding configuration from file: '%s'" % (path,)) + print("Adding configuration from file:", path) for path in config.getProvider().missingFiles: - print("Missing configuration file: '%s'" % (path,)) + print("Missing configuration file:", path) config.updateDefaults(self.overrides) @@ -425,7 +442,9 @@ def checkConfiguration(self): # for a running server (but is fine for command-line utilities) if config.ProcessType not in ["Agent", "Utility"]: if not config.EnableCalDAV and not config.EnableCardDAV: - print("Neither EnableCalDAV nor EnableCardDAV are set to True.") + print( + "Neither EnableCalDAV nor EnableCardDAV are set to True." + ) sys.exit(1) uid, gid = None, None @@ -436,8 +455,10 @@ def checkConfiguration(self): def gottaBeRoot(): if os.getuid() != 0: username = getpwuid(os.getuid()).pw_name - raise UsageError("Only root can drop privileges. You are: %r" - % (username,)) + raise UsageError( + "Only root can drop privileges. You are: {}" + .format(username) + ) if uid and uid != os.getuid(): gottaBeRoot() @@ -459,9 +480,11 @@ def gottaBeRoot(): # Check current umask and warn if changed oldmask = os.umask(config.umask) if oldmask != config.umask: - self.log.info("WARNING: changing umask from: 0%03o to 0%03o" - % (oldmask, config.umask)) - self.parent['umask'] = config.umask + self.log.info( + "WARNING: changing umask from: 0{old!03o} to 0{new!03o}", + old=oldmask, new=config.umask + ) + self.parent["umask"] = config.umask @@ -497,8 +520,10 @@ class SlaveSpawnerService(Service): - regular slave processes (CalDAV workers) """ - def __init__(self, maker, monitor, dispenser, dispatcher, configPath, - inheritFDs=None, inheritSSLFDs=None): + def __init__( + self, maker, monitor, dispenser, dispatcher, configPath, + inheritFDs=None, inheritSSLFDs=None + ): self.maker = maker self.monitor = monitor self.dispenser = dispenser @@ -524,8 +549,8 @@ def startService(self): self.monitor.addProcessObject(process, PARENT_ENVIRONMENT) if ( - config.DirectoryProxy.Enabled and - config.DirectoryProxy.SocketPath != "" + config.DirectoryProxy.Enabled and + config.DirectoryProxy.SocketPath != "" ): log.info("Adding directory proxy service") @@ -538,12 +563,13 @@ def startService(self): if config.GroupName: dpsArgv.extend(("-g", config.GroupName)) dpsArgv.extend(( - "--reactor=%s" % (config.Twisted.reactor,), + "--reactor={}".format(config.Twisted.reactor), "-n", "caldav_directoryproxy", "-f", self.configPath, )) - self.monitor.addProcess("directoryproxy", dpsArgv, - env=PARENT_ENVIRONMENT) + self.monitor.addProcess( + "directoryproxy", dpsArgv, env=PARENT_ENVIRONMENT + ) @@ -573,11 +599,23 @@ def startService(self): # ultimatelyPerform( ) handles groups correctly. Once that is fixed # these can be set to zero seconds in the future. if self.doImip: - yield scheduleNextMailPoll(self.store, int(config.LogID) if config.LogID else 5) + yield scheduleNextMailPoll( + self.store, + int(config.LogID) if config.LogID else 5 + ) if self.doGroupCaching: - yield scheduleNextGroupCachingUpdate(self.store, int(config.LogID) if config.LogID else 5) - yield scheduleFirstFindMinRevision(self.store, int(config.LogID) if config.LogID else 5) - yield scheduleFirstInboxCleanup(self.store, int(config.LogID) if config.LogID else 5) + yield scheduleNextGroupCachingUpdate( + self.store, + int(config.LogID) if config.LogID else 5 + ) + yield scheduleFirstFindMinRevision( + self.store, + int(config.LogID) if config.LogID else 5 + ) + yield scheduleFirstInboxCleanup( + self.store, + int(config.LogID) if config.LogID else 5 + ) @@ -607,12 +645,14 @@ def reExec(self): """ self.log.warn("SIGHUP received - restarting") try: - self.log.info("Removing pidfile: %s" % (self.pidfilePath,)) + self.log.info("Removing pidfile: {log_source.pidfilePath}") os.remove(self.pidfilePath) except OSError: pass - self.reactor.addSystemEventTrigger("after", "shutdown", os.execv, - sys.executable, [sys.executable] + sys.argv) + self.reactor.addSystemEventTrigger( + "after", "shutdown", os.execv, + sys.executable, [sys.executable] + sys.argv + ) self.reactor.stop() @@ -644,8 +684,10 @@ class PreProcessingService(Service): aren't allowed to upgrade the database). """ - def __init__(self, serviceCreator, connectionPool, store, logObserver, - storageService, reactor=None): + def __init__( + self, serviceCreator, connectionPool, store, logObserver, + storageService, reactor=None + ): """ @param serviceCreator: callable which will be passed the connection pool, store, and log observer, and should return a Service @@ -672,8 +714,10 @@ def stepWithResult(self, result): The final "step"; if we get here we know our store is ready, so we create the main service and pass in the store. """ - service = self.serviceCreator(self.connectionPool, self.store, - self.logObserver, self.storageService) + service = self.serviceCreator( + self.connectionPool, self.store, + self.logObserver, self.storageService + ) if self.parent is not None: self.reactor.callLater(0, service.setServiceParent, self.parent) return succeed(None) @@ -685,10 +729,14 @@ def stepWithFailure(self, failure): so we create the main service and pass in a None for the store. """ try: - service = self.serviceCreator(self.connectionPool, None, - self.logObserver, self.storageService) + service = self.serviceCreator( + self.connectionPool, None, + self.logObserver, self.storageService + ) if self.parent is not None: - self.reactor.callLater(0, service.setServiceParent, self.parent) + self.reactor.callLater( + 0, service.setServiceParent, self.parent + ) except StoreNotAvailable: self.reactor.stop() @@ -780,29 +828,43 @@ def makeService(self, options): """ replaceTwistedLoggers() - self.log.info("%s %s starting %s process..." % (self.description, version, config.ProcessType)) + self.log.info( + "{log_source.description} {version} starting " + "{config.ProcessType} process...", + version=version, config=config + ) try: from setproctitle import setproctitle + except ImportError: pass + else: execName = os.path.basename(sys.argv[0]) + if config.LogID: - logID = " #%s" % (config.LogID,) + logID = " #{}".format(config.LogID) else: logID = "" + if config.ProcessType is not "Utility": execName = "" - setproctitle("CalendarServer %s [%s%s] %s" % (version, config.ProcessType, logID, execName)) - serviceMethod = getattr(self, "makeService_%s" % (config.ProcessType,), None) + setproctitle( + "CalendarServer {} [{}{}] {}" + .format(version, config.ProcessType, logID, execName) + ) + + serviceMethod = getattr( + self, "makeService_{}".format(config.ProcessType), None + ) if not serviceMethod: raise UsageError( - "Unknown server type %s. " + "Unknown server type {}. " "Please choose: Slave, Single or Combined" - % (config.ProcessType,) + .format(config.ProcessType) ) else: # @@ -820,7 +882,7 @@ def makeService(self, options): try: service = serviceMethod(options) except ConfigurationError, e: - sys.stderr.write("Configuration error: %s\n" % (e,)) + sys.stderr.write("Configuration error: {}\n".format(e)) sys.exit(1) # @@ -836,7 +898,9 @@ def location(frame): if frame is None: return "Unknown" else: - return "%s: %s" % (frame.f_code.co_name, frame.f_lineno) + return "{frame.f_code.co_name}: {frame.f_lineno}".format( + frame=frame + ) return service @@ -867,34 +931,46 @@ def makeService_Slave(self, options): logObserver = AMPCommonAccessLoggingObserver() result = self.requestProcessingService(options, store, logObserver) directory = store.directoryService() + if pool is not None: pool.setServiceParent(result) if config.ControlSocket: id = config.ControlSocket - self.log.info("Control via AF_UNIX: %s" % (id,)) + self.log.info("Control via AF_UNIX: {id}", id=id) endpointFactory = lambda reactor: UNIXClientEndpoint( - reactor, id) + reactor, id + ) else: id = int(config.ControlPort) - self.log.info("Control via AF_INET: %d" % (id,)) + self.log.info("Control via AF_INET: {id}", id=id) endpointFactory = lambda reactor: TCP4ClientEndpoint( - reactor, "127.0.0.1", id) + reactor, "127.0.0.1", id + ) controlSocketClient = ControlSocket() + class LogClient(AMP): def startReceivingBoxes(self, sender): super(LogClient, self).startReceivingBoxes(sender) logObserver.addClient(self) + f = Factory() f.protocol = LogClient + controlSocketClient.addFactory(_LOG_ROUTE, f) + from txdav.common.datastore.sql import CommonDataStore as SQLStore + if isinstance(store, SQLStore): def queueMasterAvailable(connectionFromMaster): - store.queuer = store.queuer.transferProposalCallbacks(connectionFromMaster) - queueFactory = QueueWorkerFactory(store.newTransaction, - queueMasterAvailable) + store.queuer = store.queuer.transferProposalCallbacks( + connectionFromMaster + ) + queueFactory = QueueWorkerFactory( + store.newTransaction, queueMasterAvailable + ) controlSocketClient.addFactory(_QUEUE_ROUTE, queueFactory) + controlClient = ControlSocketConnectingService( endpointFactory, controlSocketClient ) @@ -917,8 +993,9 @@ def queueMasterAvailable(connectionFromMaster): # Optionally set up mail retrieval if config.Scheduling.iMIP.Enabled: - mailRetriever = MailRetriever(store, directory, - config.Scheduling.iMIP.Receiving) + mailRetriever = MailRetriever( + store, directory, config.Scheduling.iMIP.Receiving + ) mailRetriever.setServiceParent(result) else: mailRetriever = None @@ -932,8 +1009,8 @@ def queueMasterAvailable(connectionFromMaster): config.GroupCaching.ExpireSeconds, config.GroupCaching.LockSeconds, namespace=config.GroupCaching.MemcachedPool, - useExternalProxies=config.GroupCaching.UseExternalProxies - ) + useExternalProxies=config.GroupCaching.UseExternalProxies, + ) else: groupCacher = None @@ -948,24 +1025,32 @@ def decorateTransaction(txn): # Optionally enable Manhole access if config.Manhole.Enabled: try: - from twisted.conch.manhole_tap import makeService as manholeMakeService - portString = "tcp:%d:interface=127.0.0.1" % (config.Manhole.StartingPortNumber + int(config.LogID) + 1,) + from twisted.conch.manhole_tap import ( + makeService as manholeMakeService + ) + portString = "tcp:{:d}:interface=127.0.0.1".format( + config.Manhole.StartingPortNumber + int(config.LogID) + 1 + ) manholeService = manholeMakeService({ - "sshPort" : None, - "telnetPort" : portString, - "namespace" : { - "config" : config, - "service" : result, - "store" : store, - "directory" : directory, - }, - "passwd" : config.Manhole.PasswordFilePath, + "sshPort": None, + "telnetPort": portString, + "namespace": { + "config": config, + "service": result, + "store": store, + "directory": directory, + }, + "passwd": config.Manhole.PasswordFilePath, }) manholeService.setServiceParent(result) # Using print(because logging isn't ready at this point) - print("Manhole access enabled: %s" % (portString,)) + print("Manhole access enabled:", portString) + except ImportError: - print("Manhole access could not enabled because manhole_tap could not be imported") + print( + "Manhole access could not enabled because " + "manhole_tap could not be imported" + ) return result @@ -994,7 +1079,9 @@ def requestProcessingService(self, options, store, logObserver): # self.log.info("Setting up service") - self.log.info("Configuring access log observer: %s" % (logObserver,)) + self.log.info( + "Configuring access log observer: {observer}", observer=logObserver + ) service = CalDAVService(logObserver) rootResource = getRootResource(config, store, additional) @@ -1011,7 +1098,10 @@ def requestProcessingService(self, options, store, logObserver): requestFactory = underlyingSite if config.EnableSSL and config.RedirectHTTPToHTTPS: - self.log.info("Redirecting to HTTPS port %s" % (config.SSLPort,)) + self.log.info( + "Redirecting to HTTPS port {port}", port=config.SSLPort + ) + def requestFactory(*args, **kw): return SSLRedirectRequest(site=underlyingSite, *args, **kw) @@ -1026,15 +1116,21 @@ def requestFactory(*args, **kw): # if enabled. if config.StrictTransportSecuritySeconds: previousRequestFactory = requestFactory + def requestFactory(*args, **kw): request = previousRequestFactory(*args, **kw) + def responseFilter(ignored, response): ignored, secure = request.chanRequest.getHostInfo() if secure: - response.headers.addRawHeader("Strict-Transport-Security", - "max-age={max_age:d}" - .format(max_age=config.StrictTransportSecuritySeconds)) + response.headers.addRawHeader( + "Strict-Transport-Security", + "max-age={max_age:d}".format( + max_age=config.StrictTransportSecuritySeconds + ) + ) return response + responseFilter.handleErrors = True request.addResponseFilter(responseFilter) return request @@ -1066,7 +1162,8 @@ def updateFactory(configDict, reloading=False): (config.GroupCaching.Enabled and config.GroupCaching.EnableUpdater) ).setServiceParent(service) - # For calendarserver.tap.test.test_caldav.BaseServiceMakerTests.getSite(): + # For calendarserver.tap.test + # .test_caldav.BaseServiceMakerTests.getSite(): connectionService.underlyingSite = underlyingSite if config.InheritFDs or config.InheritSSLFDs: @@ -1077,7 +1174,10 @@ def updateFactory(configDict, reloading=False): try: contextFactory = self.createContextFactory() except SSLError, e: - log.error("Unable to set up SSL context factory: %s" % (e,)) + log.error( + "Unable to set up SSL context factory: {error}", + error=e + ) else: MaxAcceptSSLServer( int(fdAsStr), httpFactory, @@ -1099,7 +1199,10 @@ def updateFactory(configDict, reloading=False): try: contextFactory = self.createContextFactory() except SSLError, e: - self.log.error("Unable to set up SSL context factory: %s" % (e,)) + self.log.error( + "Unable to set up SSL context factory: {error}", + error=e + ) # None is okay as a context factory for ReportingHTTPService as # long as we will never receive a file descriptor with the # 'SSL' tag on it, since that's the only time it's used. @@ -1109,20 +1212,24 @@ def updateFactory(configDict, reloading=False): requestFactory, int(config.MetaFD), contextFactory ).setServiceParent(connectionService) - else: # Not inheriting, therefore we open our own: + else: # Not inheriting, therefore we open our own: for bindAddress in self._allBindAddresses(): self._validatePortConfig() if config.EnableSSL: for port in config.BindSSLPorts: - self.log.info("Adding SSL server at %s:%s" - % (bindAddress, port)) + self.log.info( + "Adding SSL server at {address}:{port}", + address=bindAddress, port=port + ) try: contextFactory = self.createContextFactory() except SSLError, e: - self.log.error("Unable to set up SSL context factory: %s" - % (e,)) - self.log.error("Disabling SSL port: %s" % (port,)) + self.log.error( + "Unable to set up SSL context factory: {error}" + "Disabling SSL port: {port}", + error=e, port=port + ) else: httpsService = MaxAcceptSSLServer( int(port), httpFactory, @@ -1210,15 +1317,17 @@ def slaveSvcCreator(pool, store, logObserver, storageService): observers = [] if config.Notifications.Services.APNS.Enabled: pushSubService = ApplePushNotifierService.makeService( - config.Notifications.Services.APNS, store) + config.Notifications.Services.APNS, store + ) observers.append(pushSubService) pushSubService.setServiceParent(result) if config.Notifications.Services.AMP.Enabled: - pushSubService = AMPPushMaster(None, result, + pushSubService = AMPPushMaster( + None, result, config.Notifications.Services.AMP.Port, config.Notifications.Services.AMP.EnableStaggering, - config.Notifications.Services.AMP.StaggerSeconds - ) + config.Notifications.Services.AMP.StaggerSeconds, + ) observers.append(pushSubService) if observers: pushDistributor = PushDistributor(observers) @@ -1227,8 +1336,9 @@ def slaveSvcCreator(pool, store, logObserver, storageService): # Optionally set up mail retrieval if config.Scheduling.iMIP.Enabled: - mailRetriever = MailRetriever(store, directory, - config.Scheduling.iMIP.Receiving) + mailRetriever = MailRetriever( + store, directory, config.Scheduling.iMIP.Receiving + ) mailRetriever.setServiceParent(result) else: mailRetriever = None @@ -1244,7 +1354,9 @@ def slaveSvcCreator(pool, store, logObserver, storageService): namespace=config.GroupCaching.MemcachedPool, useExternalProxies=config.GroupCaching.UseExternalProxies ) - newGroupCacher = NewGroupCacher(DirectoryProxyClientService(None)) + newGroupCacher = NewGroupCacher( + DirectoryProxyClientService(None) + ) else: groupCacher = None newGroupCacher = None @@ -1252,24 +1364,31 @@ def slaveSvcCreator(pool, store, logObserver, storageService): # Optionally enable Manhole access if config.Manhole.Enabled: try: - from twisted.conch.manhole_tap import makeService as manholeMakeService - portString = "tcp:%d:interface=127.0.0.1" % (config.Manhole.StartingPortNumber,) + from twisted.conch.manhole_tap import ( + makeService as manholeMakeService + ) + portString = "tcp:{:d}:interface=127.0.0.1".format( + config.Manhole.StartingPortNumber + ) manholeService = manholeMakeService({ - "sshPort" : None, - "telnetPort" : portString, - "namespace" : { - "config" : config, - "service" : result, - "store" : store, - "directory" : directory, - }, - "passwd" : config.Manhole.PasswordFilePath, + "sshPort": None, + "telnetPort": portString, + "namespace": { + "config": config, + "service": result, + "store": store, + "directory": directory, + }, + "passwd": config.Manhole.PasswordFilePath, }) manholeService.setServiceParent(result) # Using print(because logging isn't ready at this point) - print("Manhole access enabled: %s" % (portString,)) + print("Manhole access enabled:", portString) except ImportError: - print("Manhole access could not enabled because manhole_tap could not be imported") + print( + "Manhole access could not enabled because " + "manhole_tap could not be imported" + ) # Optionally enable Directory Proxy if config.DirectoryProxy.Enabled: @@ -1302,7 +1421,8 @@ def decorateTransaction(txn): for name, pool in config.Memcached.Pools.items(): if pool.ServerEnabled: self.log.info( - "Adding memcached service for pool: %s" % (name,) + "Adding memcached service for pool: {name}", + name=name, pool=pool ) memcachedArgv = [ config.Memcached.memcached, @@ -1319,7 +1439,9 @@ def decorateTransaction(txn): memcachedArgv.extend(config.Memcached.Options) Popen(memcachedArgv) - return self.storageService(slaveSvcCreator, logObserver, uid=uid, gid=gid) + return self.storageService( + slaveSvcCreator, logObserver, uid=uid, gid=gid + ) def makeService_Utility(self, options): @@ -1342,9 +1464,9 @@ def makeService_Agent(self, options): Create an agent service which listens for configuration requests """ - # Don't use memcached initially -- calendar server might take it away at - # any moment. However, when we run a command through the gateway, it - # will conditionally set ClientEnabled at that time. + # Don't use memcached initially -- calendar server might take it away + # at any moment. However, when we run a command through the gateway, + # it will conditionally set ClientEnabled at that time. def agentPostUpdateHook(configDict, reloading=False): configDict.Memcached.Pools.Default.ClientEnabled = False @@ -1359,8 +1481,11 @@ def agentServiceCreator(pool, store, ignored, storageService): if storageService is not None: # Shut down if DataRoot becomes unavailable from twisted.internet import reactor - dataStoreWatcher = DirectoryChangeListener(reactor, - config.DataRoot, DataStoreMonitor(reactor, storageService)) + dataStoreWatcher = DirectoryChangeListener( + reactor, + config.DataRoot, + DataStoreMonitor(reactor, storageService) + ) dataStoreWatcher.startListening() if store is not None: store.queuer = NonPerformingQueuer() @@ -1373,12 +1498,14 @@ def agentServiceCreator(pool, store, ignored, storageService): config.AgentLogFile, config.ErrorLogRotateMB * 1024 * 1024, config.ErrorLogMaxRotatedFiles - ) + ) svc.setServiceParent(agentLoggingService) return agentLoggingService - def storageService(self, createMainService, logObserver, uid=None, gid=None): + def storageService( + self, createMainService, logObserver, uid=None, gid=None + ): """ If necessary, create a service to be started used for storage; for example, starting a database backend. This service will then start the @@ -1388,13 +1515,12 @@ def storageService(self, createMainService, logObserver, uid=None, gid=None): stand alone port-binding until the backing for the selected data store implementation is ready to process requests. - @param createMainService: This is the service that will be doing the main - work of the current process. If the configured storage mode does - not require any particular setup, then this may return the + @param createMainService: This is the service that will be doing the + main work of the current process. If the configured storage mode + does not require any particular setup, then this may return the C{mainService} argument. - - @type createMainService: C{callable} that takes C{(connectionPool, store)} - and returns L{IService} + @type createMainService: C{callable} that takes C{(connectionPool, + store)} and returns L{IService} @param uid: the user ID to run the backend as, if this process is running as root (also the uid to chown Attachments to). @@ -1405,21 +1531,24 @@ def storageService(self, createMainService, logObserver, uid=None, gid=None): @type gid: C{int} @return: the appropriate a service to start. - @rtype: L{IService} """ - def createSubServiceFactory(dialect=POSTGRES_DIALECT, - paramstyle='pyformat'): + def createSubServiceFactory( + dialect=POSTGRES_DIALECT, paramstyle='pyformat' + ): def subServiceFactory(connectionFactory, storageService): ms = MultiService() - cp = ConnectionPool(connectionFactory, dialect=dialect, - paramstyle=paramstyle, - maxConnections=config.MaxDBConnectionsPerPool) + cp = ConnectionPool( + connectionFactory, dialect=dialect, + paramstyle=paramstyle, + maxConnections=config.MaxDBConnectionsPerPool + ) cp.setServiceParent(ms) store = storeFromConfig(config, cp.connection) - pps = PreProcessingService(createMainService, cp, store, - logObserver, storageService) + pps = PreProcessingService( + createMainService, cp, store, logObserver, storageService + ) # The following "steps" will run sequentially when the service # hierarchy is started. If any of the steps raise an exception @@ -1479,13 +1608,14 @@ def subServiceFactory(connectionFactory, storageService): # Conditionally stop after upgrade at this point pps.addStep( QuitAfterUpgradeStep( - config.StopAfterUpgradeTriggerFile or config.UpgradeHomePrefix + config.StopAfterUpgradeTriggerFile or + config.UpgradeHomePrefix ) ) pps.addStep( - PostDBImportStep(store, config, - getattr(self, "doPostImport", True) + PostDBImportStep( + store, config, getattr(self, "doPostImport", True) ) ) @@ -1505,7 +1635,7 @@ def subServiceFactory(connectionFactory, storageService): if config.UseDatabase: - if os.getuid() == 0: # Only override if root + if os.getuid() == 0: # Only override if root overrideUID = uid overrideGID = gid else: @@ -1521,17 +1651,23 @@ def subServiceFactory(connectionFactory, storageService): uid=overrideUID, gid=overrideGID ) return pgserv - elif config.DBType == 'postgres': + elif config.DBType == "postgres": # Connect to a postgres database that is already running. - return createSubServiceFactory()(pgConnectorFromConfig(config), None) - elif config.DBType == 'oracle': + return createSubServiceFactory()( + pgConnectorFromConfig(config), None + ) + elif config.DBType == "oracle": # Connect to an Oracle database that is already running. - return createSubServiceFactory(dialect=ORACLE_DIALECT, - paramstyle='numeric')( + return createSubServiceFactory( + dialect=ORACLE_DIALECT, + paramstyle="numeric" + )( oracleConnectorFromConfig(config), None ) else: - raise UsageError("Unknown database type %r" (config.DBType,)) + raise UsageError( + "Unknown database type {}".format(config.DBType) + ) else: store = storeFromConfig(config, None) return createMainService(None, store, logObserver, None) @@ -1566,8 +1702,9 @@ def makeService_Combined(self, options): try: gid = getgrnam(config.GroupName).gr_gid except KeyError: - raise ConfigurationError("Invalid group name: %s" % - (config.GroupName,)) + raise ConfigurationError( + "Invalid group name: {}".format(config.GroupName) + ) else: gid = os.getgid() @@ -1575,8 +1712,9 @@ def makeService_Combined(self, options): try: uid = getpwnam(config.UserName).pw_uid except KeyError: - raise ConfigurationError("Invalid user name: %s" % - (config.UserName,)) + raise ConfigurationError( + "Invalid user name: {}".format(config.UserName) + ) else: uid = os.getuid() @@ -1584,7 +1722,10 @@ def makeService_Combined(self, options): controlSocket.addFactory(_LOG_ROUTE, logger) # Optionally set up AMPPushMaster - if config.Notifications.Enabled and config.Notifications.Services.AMP.Enabled: + if ( + config.Notifications.Enabled and + config.Notifications.Services.AMP.Enabled + ): ampSettings = config.Notifications.Services.AMP AMPPushMaster( controlSocket, @@ -1609,14 +1750,17 @@ def makeService_Combined(self, options): monitor.setServiceParent(s) if config.MemoryLimiter.Enabled: - memoryLimiter = MemoryLimitService(monitor, config.MemoryLimiter.Seconds, - config.MemoryLimiter.Bytes, config.MemoryLimiter.ResidentOnly) + memoryLimiter = MemoryLimitService( + monitor, config.MemoryLimiter.Seconds, + config.MemoryLimiter.Bytes, config.MemoryLimiter.ResidentOnly + ) memoryLimiter.setServiceParent(s) for name, pool in config.Memcached.Pools.items(): if pool.ServerEnabled: self.log.info( - "Adding memcached service for pool: %s" % (name,) + "Adding memcached service for pool: {name}", + name=name, pool=pool ) memcachedArgv = [ config.Memcached.memcached, @@ -1631,8 +1775,10 @@ def makeService_Combined(self, options): if config.UserName: memcachedArgv.extend(["-u", config.UserName]) memcachedArgv.extend(config.Memcached.Options) - monitor.addProcess('memcached-%s' % (name,), memcachedArgv, - env=PARENT_ENVIRONMENT) + monitor.addProcess( + "memcached-{}".format(name), memcachedArgv, + env=PARENT_ENVIRONMENT + ) # Open the socket(s) to be inherited by the slaves inheritFDs = [] @@ -1644,7 +1790,8 @@ def makeService_Combined(self, options): config.MultiProcess.ProcessCount)) dispatcher = cl.dispatcher else: - s._inheritedSockets = [] # keep a reference to these so they don't close + # keep a reference to these so they don't close + s._inheritedSockets = [] dispatcher = None for bindAddress in self._allBindAddresses(): @@ -1655,10 +1802,16 @@ def makeService_Combined(self, options): portsList.append((config.BindSSLPorts, "SSL")) for ports, description in portsList: for port in ports: - cl.addPortService(description, port, bindAddress, config.ListenBacklog) + cl.addPortService( + description, port, bindAddress, + config.ListenBacklog + ) else: def _openSocket(addr, port): - log.info("Opening socket for inheritance at %s:%d" % (addr, port)) + log.info( + "Opening socket for inheritance at {address}:{port}", + address=addr, port=port + ) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -1696,22 +1849,29 @@ def _openSocket(addr, port): # Optionally enable Manhole access if config.Manhole.Enabled: try: - from twisted.conch.manhole_tap import makeService as manholeMakeService - portString = "tcp:%d:interface=127.0.0.1" % (config.Manhole.StartingPortNumber,) + from twisted.conch.manhole_tap import ( + makeService as manholeMakeService + ) + portString = "tcp:{:d}:interface=127.0.0.1".format( + config.Manhole.StartingPortNumber + ) manholeService = manholeMakeService({ - "sshPort" : None, - "telnetPort" : portString, - "namespace" : { - "config" : config, - "service" : s, - }, - "passwd" : config.Manhole.PasswordFilePath, + "sshPort": None, + "telnetPort": portString, + "namespace": { + "config": config, + "service": s, + }, + "passwd": config.Manhole.PasswordFilePath, }) manholeService.setServiceParent(s) # Using print(because logging isn't ready at this point) - print("Manhole access enabled: %s" % (portString,)) + print("Manhole access enabled:", portString) except ImportError: - print("Manhole access could not enabled because manhole_tap could not be imported") + print( + "Manhole access could not enabled because " + "manhole_tap could not be imported" + ) # Finally, let's get the real show on the road. Create a service that @@ -1727,9 +1887,13 @@ def spawnerSvcCreator(pool, store, ignored, storageService): raise StoreNotAvailable() from twisted.internet import reactor - pool = PeerConnectionPool(reactor, store.newTransaction, config.WorkQueue.ampPort) + pool = PeerConnectionPool( + reactor, store.newTransaction, config.WorkQueue.ampPort + ) store.queuer = store.queuer.transferProposalCallbacks(pool) - controlSocket.addFactory(_QUEUE_ROUTE, pool.workerListenerFactory()) + controlSocket.addFactory( + _QUEUE_ROUTE, pool.workerListenerFactory() + ) # TODO: now that we have the shared control socket, we should get # rid of the connection dispenser and make a shared / async # connection pool implementation that can dispense transactions @@ -1754,8 +1918,9 @@ def spawnerSvcCreator(pool, store, ignored, storageService): # Optionally set up mail retrieval if config.Scheduling.iMIP.Enabled: - mailRetriever = MailRetriever(store, directory, - config.Scheduling.iMIP.Receiving) + mailRetriever = MailRetriever( + store, directory, config.Scheduling.iMIP.Receiving + ) mailRetriever.setServiceParent(multi) else: mailRetriever = None @@ -1770,7 +1935,7 @@ def spawnerSvcCreator(pool, store, ignored, storageService): config.GroupCaching.LockSeconds, namespace=config.GroupCaching.MemcachedPool, useExternalProxies=config.GroupCaching.UseExternalProxies - ) + ) else: groupCacher = None @@ -1792,20 +1957,27 @@ def decorateTransaction(txn): def deleteStaleSocketFiles(self): # Check all socket files we use. - for checkSocket in [config.ControlSocket, config.Stats.UnixStatsSocket] : - + for checkSocket in [ + config.ControlSocket, config.Stats.UnixStatsSocket + ]: # See if the file exists. if (os.path.exists(checkSocket)): # See if the file represents a socket. If not, delete it. if (not stat.S_ISSOCK(os.stat(checkSocket).st_mode)): - self.log.warn("Deleting stale socket file (not a socket): %s" % checkSocket) + self.log.warn( + "Deleting stale socket file (not a socket): {socket}", + socket=checkSocket + ) os.remove(checkSocket) else: - # It looks like a socket. See if it's accepting connections. - tmpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # It looks like a socket. + # See if it's accepting connections. + tmpSocket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM + ) numConnectFailures = 0 testPorts = [config.HTTPPort, config.SSLPort] - for testPort in testPorts : + for testPort in testPorts: try: tmpSocket.connect(("127.0.0.1", testPort)) tmpSocket.shutdown(2) @@ -1814,7 +1986,11 @@ def deleteStaleSocketFiles(self): # If the file didn't connect on any expected ports, # consider it stale and remove it. if numConnectFailures == len(testPorts): - self.log.warn("Deleting stale socket file (not accepting connections): %s" % checkSocket) + self.log.warn( + "Deleting stale socket file " + "(not accepting connections): {socket}", + socket=checkSocket + ) os.remove(checkSocket) @@ -1869,11 +2045,13 @@ def __init__(self, twistd, tapname, configFile, id, interfaces, self.configFile = configFile self.id = id + def emptyIfNone(x): if x is None: return [] else: return x + self.inheritFDs = emptyIfNone(inheritFDs) self.inheritSSLFDs = emptyIfNone(inheritSSLFDs) self.metaSocket = metaSocket @@ -1883,7 +2061,7 @@ def emptyIfNone(x): def getName(self): - return '%s-%s' % (self.prefix, self.id) + return "{}-{}".format(self.prefix, self.id) def getFileDescriptors(self): @@ -1935,46 +2113,49 @@ def getCommandLine(self): args.extend(("-g", config.GroupName)) if config.Profiling.Enabled: - args.append( - "--profile=%s/%s.pstats" - % (config.Profiling.BaseDirectory, self.getName()) - ) + args.append("--profile={}/{}.pstats".format( + config.Profiling.BaseDirectory, self.getName() + )) args.extend(("--savestats", "--profiler", "cprofile-cpu")) args.extend([ - "--reactor=%s" % (config.Twisted.reactor,), + "--reactor={}".format(config.Twisted.reactor), "-n", self.tapname, "-f", self.configFile, "-o", "ProcessType=Slave", - "-o", "BindAddresses=%s" % (",".join(self.interfaces),), - "-o", "PIDFile=%s-instance-%s.pid" % (self.tapname, self.id,), + "-o", "BindAddresses={}".format(",".join(self.interfaces)), + "-o", "PIDFile={}-instance-{}.pid".format(self.tapname, self.id), "-o", "ErrorLogFile=None", "-o", "ErrorLogEnabled=False", - "-o", "LogID=%s" % (self.id,), - "-o", "MultiProcess/ProcessCount=%d" - % (config.MultiProcess.ProcessCount,), - "-o", "ControlPort=%d" - % (config.ControlPort,), + "-o", "LogID={}".format(self.id), + "-o", "MultiProcess/ProcessCount={:d}".format( + config.MultiProcess.ProcessCount + ), + "-o", "ControlPort={:d}".format(config.ControlPort), ]) if self.inheritFDs: args.extend([ - "-o", "InheritFDs=%s" % (",".join(map(str, self.inheritFDs)),) + "-o", "InheritFDs={}".format( + ",".join(map(str, self.inheritFDs)) + ) ]) if self.inheritSSLFDs: args.extend([ - "-o", "InheritSSLFDs=%s" % (",".join(map(str, self.inheritSSLFDs)),) + "-o", "InheritSSLFDs={}".format( + ",".join(map(str, self.inheritSSLFDs)) + ) ]) if self.metaSocket is not None: args.extend([ - "-o", "MetaFD=%s" % (self.metaSocket.fileno(),) - ]) + "-o", "MetaFD={}".format(self.metaSocket.fileno()) + ]) if self.ampDBSocket is not None: args.extend([ - "-o", "DBAMPFD=%s" % (self.ampDBSocket.fileno(),) - ]) + "-o", "DBAMPFD={}".format(self.ampDBSocket.fileno()) + ]) return args @@ -2064,10 +2245,13 @@ def addProcess(self, name, args, uid=None, gid=None, env={}): class SimpleProcessObject(object): def getName(self): return name + def getCommandLine(self): return args + def getFileDescriptors(self): return [] + self.addProcessObject(SimpleProcessObject(), env, uid, gid) @@ -2132,7 +2316,7 @@ def stopProcess(self, name): @param name: The name of the process to be stopped """ if name not in self.processes: - raise KeyError('Unrecognized process name: %s' % (name,)) + raise KeyError("Unrecognized process name: {}".format(name)) proto = self.protocols.get(name, None) if proto is not None: @@ -2143,8 +2327,8 @@ def stopProcess(self, name): pass else: self.murder[name] = self._reactor.callLater( - self.killTime, - self._forceStopProcess, proc) + self.killTime, self._forceStopProcess, proc + ) def processEnded(self, name): @@ -2241,7 +2425,7 @@ def reallyStartProcess(self, name): procObj, env, uid, gid = self.processes[name] self.timeStarted[name] = time() - childFDs = {0 : "w", 1 : "r", 2 : "r"} + childFDs = {0: "w", 1: "r", 2: "r"} childFDs.update(procObj.getFileDescriptors()) @@ -2263,17 +2447,19 @@ def startProcess(self, name): """ interval = (self.delayInterval * self._pendingStarts) self._pendingStarts += 1 + def delayedStart(): self._pendingStarts -= 1 self.reallyStartProcess(name) + self._reactor.callLater(interval, delayedStart) def restartAll(self): """ Restart all processes. This is useful for third party management - services to allow a user to restart servers because of an outside change - in circumstances -- for example, a new version of a library is + services to allow a user to restart servers because of an outside + change in circumstances -- for example, a new version of a library is installed. """ for name in self.processes: @@ -2282,6 +2468,7 @@ def restartAll(self): def __repr__(self): l = [] + for name, (procObj, uid, gid, _ignore_env) in self.processes.items(): uidgid = '' if uid is not None: @@ -2291,10 +2478,12 @@ def __repr__(self): if uidgid: uidgid = '(' + uidgid + ')' - l.append('%r%s: %r' % (name, uidgid, procObj)) - return ('<' + self.__class__.__name__ + ' ' - + ' '.join(l) - + '>') + l.append("{:r}{}: {:r}".format(name, uidgid, procObj)) + + return ( + "<{self.__class__.__name__} {l}>" + .format(self=self, l=" ".join(l)) + ) @@ -2306,7 +2495,7 @@ class DelayedStartupLineLogger(object): MAX_LENGTH = 1024 CONTINUED_TEXT = " (truncated, continued)" tag = None - exceeded = False # Am I in the middle of parsing a long line? + exceeded = False # Am I in the middle of parsing a long line? _buffer = '' def makeConnection(self, transport): @@ -2338,7 +2527,7 @@ def dataReceived(self, data): def lineReceived(self, line): from twisted.python.log import msg - msg('[%s] %s' % (self.tag, line)) + msg("[{}] {}".format(self.tag, line)) def lineLengthExceeded(self, line): @@ -2361,13 +2550,22 @@ def _breakLineIntoSegments(self, line): @return: array of C{str} """ length = len(line) - numSegments = length / self.MAX_LENGTH + (1 if length % self.MAX_LENGTH else 0) + + numSegments = ( + length / self.MAX_LENGTH + + (1 if length % self.MAX_LENGTH else 0) + ) + segments = [] + for i in range(numSegments): msg = line[i * self.MAX_LENGTH:(i + 1) * self.MAX_LENGTH] - if i < numSegments - 1: # not the last segment + + if i < numSegments - 1: # not the last segment msg += self.CONTINUED_TEXT + segments.append(msg) + return segments @@ -2424,10 +2622,14 @@ def getSSLPassphrase(*ignored): output, error = child.communicate() if child.returncode: - log.error("Could not get passphrase for %s: %s" - % (config.SSLPrivateKey, error)) + log.error( + "Could not get passphrase for {key}: {error}", + key=config.SSLPrivateKey, error=error + ) else: - log.info("Obtained passphrase for %s" % (config.SSLPrivateKey)) + log.info( + "Obtained passphrase for {key}", key=config.SSLPrivateKey + ) return output.strip() if ( @@ -2448,13 +2650,15 @@ def getSSLPassphrase(*ignored): sslPrivKey.close() if keyType is None: - log.error("Could not get private key type for %s" - % (config.SSLPrivateKey,)) + log.error( + "Could not get private key type for {key}", + key=config.SSLPrivateKey + ) else: child = Popen( args=[ config.SSLPassPhraseDialog, - "%s:%s" % (config.ServerHostName, config.SSLPort), + "{}:{}".format(config.ServerHostName, config.SSLPort), keyType, ], stdout=PIPE, stderr=PIPE, @@ -2462,8 +2666,10 @@ def getSSLPassphrase(*ignored): output, error = child.communicate() if child.returncode: - log.error("Could not get passphrase for %s: %s" - % (config.SSLPrivateKey, error)) + log.error( + "Could not get passphrase for {key}: {error}", + key=config.SSLPrivateKey, error=error + ) else: return output.strip() @@ -2489,7 +2695,9 @@ def getSystemIDs(userName, groupName): try: uid = getpwnam(userName).pw_uid except KeyError: - raise ConfigurationError("Invalid user name: %s" % (userName,)) + raise ConfigurationError( + "Invalid user name: {}".format(userName) + ) else: uid = getuid() @@ -2497,7 +2705,9 @@ def getSystemIDs(userName, groupName): try: gid = getgrnam(groupName).gr_gid except KeyError: - raise ConfigurationError("Invalid group name: %s" % (groupName,)) + raise ConfigurationError( + "Invalid group name: {}".format(groupName) + ) else: gid = getgid() diff --git a/calendarserver/webadmin/work.py b/calendarserver/webadmin/work.py index 53aa6e1c2..7aad3dabb 100644 --- a/calendarserver/webadmin/work.py +++ b/calendarserver/webadmin/work.py @@ -39,10 +39,11 @@ # from twistedcaldav.directory.directory import GroupCacherPollingWork # from calendarserver.push.notifier import PushNotificationWork -from txdav.caldav.datastore.scheduling.work import ( - ScheduleOrganizerWork, ScheduleReplyWork, ScheduleRefreshWork -) +# from txdav.caldav.datastore.scheduling.work import ( +# ScheduleOrganizerWork, ScheduleReplyWork, ScheduleRefreshWork +# ) +from twext.enterprise.jobqueue import JobItem from .eventsource import EventSourceResource, IEventDecoder from .resource import PageElement, TemplateResource @@ -105,59 +106,63 @@ def poll(self): payload = {} - for workDescription, workItemClass, itemAttributes in ( - ( - u"Organizer Request", - ScheduleOrganizerWork, - ( - ("icalendarUid", "iCalendar UID"), - ("attendeeCount", "Attendee Count"), - ), - ), - ( - u"Attendee Reply", - ScheduleReplyWork, - ( - ("icalendarUid", "iCalendar UID"), - ), - ), - ( - u"Attendee Refresh", - ScheduleRefreshWork, - ( - ("icalendarUid", "iCalendar UID"), - ("attendeeCount", "Attendee Count"), - ), - ), - ): - workItems = yield workItemClass.all(txn) - - categoryData = [] - - for workItem in workItems: - itemData = {} - - for itemAttribute, itemDescription in itemAttributes: - itemData[itemDescription] = getattr( - workItem, itemAttribute - ) - - categoryData.append(itemData) - - payload[workDescription] = categoryData - - self.addEvents(( - dict( - eventClass=u"work", - eventText=asJSON(payload), - ), - )) - - if not hasattr(self, "_clock"): - from twisted.internet import reactor - self._clock = reactor - - # self._clock.callLater(5, self.poll) + records = yield JobItem.histogram(txn) + + + + # for workDescription, workItemClass, itemAttributes in ( + # ( + # u"Organizer Request", + # ScheduleOrganizerWork, + # ( + # ("icalendarUid", "iCalendar UID"), + # ("attendeeCount", "Attendee Count"), + # ), + # ), + # ( + # u"Attendee Reply", + # ScheduleReplyWork, + # ( + # ("icalendarUid", "iCalendar UID"), + # ), + # ), + # ( + # u"Attendee Refresh", + # ScheduleRefreshWork, + # ( + # ("icalendarUid", "iCalendar UID"), + # ("attendeeCount", "Attendee Count"), + # ), + # ), + # ): + # workItems = yield workItemClass.all(txn) + + # categoryData = [] + + # for workItem in workItems: + # itemData = {} + + # for itemAttribute, itemDescription in itemAttributes: + # itemData[itemDescription] = getattr( + # workItem, itemAttribute + # ) + + # categoryData.append(itemData) + + # payload[workDescription] = categoryData + + # self.addEvents(( + # dict( + # eventClass=u"work", + # eventText=asJSON(payload), + # ), + # )) + + # if not hasattr(self, "_clock"): + # from twisted.internet import reactor + # self._clock = reactor + + # # self._clock.callLater(5, self.poll)