Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

286 lines (250 sloc) 12.894 kb
Time and schedule related models.
import dateutil
from commonscripts import hours,parseTime, readCSV,getclass_inlist, drop_case_spaces,transpose,frange,getattrL,getTimeFormat,writeCSV
from operator import attrgetter
class Time(object):
Describes a time interval.
:param Start: time interval begins. A
:py:class:`datetime.datetime` object.
If a string is input, parsed using
:param End: time interval ends. Same parsing as :Start:.
:param interval: the length of the time interval (optional).
If specified :End: is optional. If not specified it is computed
and stored as a :py:class:`datetime.timedelta` object.
def __init__(self,Start,End=None,interval=None,index=None,formatter=None):
vars(self).update(locals()) #load in inputs
if isinstance(Start, str): self.Start=parseTime(self.Start,formatter)
if isinstance(End, str): self.End=parseTime(self.End,formatter)
if interval is None and End is None: self.End = self.Start + hours(1)
if interval is None: self.interval=self.End - self.Start
elif End is None: self.End=self.Start + self.interval
else: #use both as check
if self.End != self.Start + self.interval:
raise ValueError(
'''End should be interval from Start: e!=s+i:
{e}!={si}'''.format(e=self.End,s=self.Start,i=self.interval,si=self.Start + self.interval))
def Range(self,interval):
rangeLenHrs = hours(self.End-self.Start)
return [self.Start + hours(t) for t in frange(0,rangeLenHrs,intervalStepHrs)]
def time_passed_since(self,other): return self.End - other.Start
# def __sub__(self, other): return self.End-other.Start
# def __eq__(self, other): return self.__dict__ == other.__dict__ if type(other) is type(self) else False
# def __ne__(self, other): return not self.__eq__(other)
# def __cmp__(self, other): return self.__eq__(other) #cmp(self.Start,other.Start)
def __str__(self):
try: return 't{ind:02d}'.format(ind=self.index)
except ValueError: return 't{ind}'.format(ind=self.index) #index is str
def __unicode__(self):
return unicode(self.Start)+' to '+unicode(self.End)
def __repr__(self): return repr(self.Start)
def make_times(datetimeL):
'''convert list of :py:class:`datetime.datetime` objects to :class:`~schedule.Timelist` object'''
I=datetimeL[1] - S #interval
E=datetimeL[-1] + I #one past the last time
return times
def make_times_basic(N):
''' make a :class:`schedule.Timelist` of N times, assume hourly interval.'''
return times
class Timelist(object):
A container for :class:`schedule.Time` objects.
:param list_of_times: a tuple, list, or existing :class:`~schedule.Timelist` object
:param Start: a :py:class:`datetime.datetime` object
:param End: a :py:class:`datetime.datetime` object
:param interval: a :py:class:`datetime.timedelta` object
If ``Start``, ``End``, ``interval`` are specified as
then a uniformly spaced list of times is generated.
Otherwise the list is created from ``list_of_times``.
def __init__(self, list_of_times=None,initialTime=None,Start=None,End=None,interval=None):
if Start and End and interval:
if steps==int(steps): steps=int(steps)
else: raise ValueError('Times must be integer interval. j/i={j}/{i} must be an integer.'.format(i=hours(End-Start),j=hours(interval)))
self.times=[Time(Start=Start+i*interval,interval=interval,index=i) for i in range(steps)]
self.times = ()
if list_of_times is not None:
if type(list_of_times) == type(self.times): self.times = list_of_times
elif isinstance(list_of_times, Timelist): self.times = list_of_times.times[:]
else: self.times = tuple(list_of_times)
for t in self.times:
if t.interval != interval: raise ValueError('time intervals within list varies at {t}. This time has interval {i}. List (1st element) has interval of {li}.:'.format(t=t.Start,i=t.interval,li=interval))
self.interval = interval
self.intervalhrs = hours(self.interval)
self.Start = self.times[0].Start
self.End = self.times[-1].End
self.span = self.End - self.Start
self.spanhrs = hours(self.span)
def __repr__(self): return repr(self.times)
def __contains__(self, item): return item in self.times
def __len__(self): return len(self.times)
def __getitem__(self, i): return self.times[i]
def __getslice__(self, i, j): return self.times[i:j]
def index(self,val): return self.times.index(val)
def setInitial(self,initialTime=None):
if initialTime: self.initialTime= initialTime
else: self.initialTime = Time(Start=self.Start-self.interval, interval=self.interval,index='Init')
self.wInitial = tuple([self.initialTime] + list(self.times))
def subdivide(self,division_hrs=24,interval_hrs=None,overlap_hrs=0,offset_hrs=0):
Subdivide a list of times into serval stages, each stage
spanning `division_hrs` with intervals of `interval_hrs`.
:param hoursperdivision: time span of each stage (excluding overlap)
:param interval_hrs: (optional) time span of each interval
for each stage. If not specified, `intervalhrs` is used.
:param overlap_hrs: (optional) overlap between time stages, default is 0
typical use:
>>> t=make_times_basic(8)
>>> t.subdivide(division_hrs=4)
but should also be able to handle longer intervals:
>>> t.subdivide(division_hrs=4,interval_hrs=2)
and handle arbitary overlaps:
>>> t.subdivide(division_hrs=4,overlap_hrs=2)
def timeslice(tStart,tEnd,index): return Time(Start=tStart,End=tEnd,index=index)
if interval_hrs is None: interval_hrs=self.intervalhrs
span_intervals= division_hrs/interval_hrs
overlap_intervals = overlap_hrs/interval_hrs
offset_intervals = offset_hrs/interval_hrs
for nm,val in intervals.items():
if val!=int(val):
msg='for time subdivision {nm} must be an integer number of intervals (is {val})'.format(nm=nm,val=val)
raise ValueError(msg)
if interval_hrs==self.intervalhrs and division_hrs==self.intervalhrs: return self
elif interval_hrs==self.intervalhrs:
elif interval_hrs>self.intervalhrs:
if steps==int(steps): steps=int(steps)
else: raise ValueError('Native Timelist interval is i={i}, while proposed interval is j={j}. j/i must be an integer.'.format(i=self.intervalhrs,j=interval_hrs))
longertimeL = Timelist([timeslice(self[i].Start, self[i+steps-1].End,i) for i in range(0,len(self)-steps+1,steps)])
newtimesL = divide_into_stages(longertimeL,**intervals)
for t,stage in enumerate(newtimesL):
if t>0: stage.setInitial( newtimesL[t-1][-1-intervals['overlap']] )
elif t==0: stage.setInitial( self.initialTime )
stage.non_overlap_times = stage[:-1-int(intervals['overlap'])+1] if intervals['overlap']>0 else stage
#the last stage has no overlap and may not cover the whole division
stage.non_overlap_times = stage
return newtimesL
def divide_into_stages(L, span=0,overlap=0,offset=0):
"""divide list into stage_size-sized chunks, with optional overlap."""
for i in xrange(offset, len(L), span):
stages.append( Timelist(L[i:i+span+overlap]) )
#the last stage has no overlap and can contain less than the full stage_size intervals
return stages
def make_schedule(filename,times=None):
Read time and power information from spreadsheet file.
attributes=[map_field_attr[drop_case_spaces(f)] for f in fields]
if times is None: times=make_times(parse_timestrings(transpose(data)[attributes.index('time')]))
return Schedule(times,data_power)
class Schedule(object):
Describes a schedule of times and corresponding power values.
A container for a dictionary keyed by
:py:class:`~schedule.Time` objects.
def __init__(self,times=None,P=None):,P))
self.intervalhrs = times.intervalhrs
def __imul__(self,multiplier):
Multiplies each power value in schedule by a multiplier.
Usage: schedule*=.9
would give a schedule with 90 percent of the power.
for t,p in[t]=p*multiplier
return self
def times(self):
return Timelist(sorted(,key=attrgetter('Start','End')))
def __repr__(self):
return repr(sorted([(str(t),p) for t,p in]))
def get_energy(self,timeperiod):
get the amount of energy in a time period
if the timeperiod is in the schedule things are simple:
>>> times=Timelist([Time(Start='1:00'),Time(Start='2:00'),Time(Start='3:00')])
>>> energy=[100,210,100]
>>> s=Schedule(energy=energy,times=times)
>>> s.get_energy(times[0])
If the timeperiod spans multiple schedule times, energy should
be summed over each schedule time which the timeperiod contains:
>>> tLarger=Time(Start='1:00',End='3:00')
>>> s.get_energy(tLarger)
try: return[timeperiod]
except KeyError:
if timeperiod.interval>self.interval:
#energy of time is sum all energy in times within
return sum([self.get_energy(t) for t in period_Times]) *self.intervalhrs
elif timeperiod.interval==self.interval:
return self.get_energy(times[t])
else: raise
def saveCSV(self,filename='schedule.csv'):
data=sorted(,key=lambda t_e: t_e[0].Start)
data=[(row[0].Start,row[1]) for row in data]
class FixedSchedule(Schedule):
'''A simple "schedule" which has only one power output'''
def __init__(self,times=None,P=None):
def get_energy(self,timeperiod=None): return
def __repr__(self): return 'FixedSchedule<energy={}>'.format(
def just_one_time():
"""For a single-time problem, generate a Timelist with just one time in it."""
return Timelist([Time(Start='0:00',index=0)])
def parse_timestrings(timestringsL):
Convert list of strings to list of :py:class:`datetime.datetime`
objects using :meth:`dateutil.parser.parse`.
return [dateutil.parser.parse(string,**fmt) for string in timestringsL]
if __name__ == "__main__":
import doctest
Jump to Line
Something went wrong with that request. Please try again.