"""
File: demo_timeseries.py
Author: Greg Harfst (greg at techguy in midtown.com)
Notes:
A demonstration of numpy arrays, masked arrays, and scikits.timeseries.
"""
import numpy as NP
import numpy.ma as MA
import time
import scikits.timeseries as TS
import random
import itertools as IT
from operator import itemgetter
import optparse
def processOptions():
"""Process the command line options using 'optparse'"""
parser = optparse.OptionParser()
parser.add_option("--numTickers", type='int', default=4000, # 1400
help="Number of fake tickers to generate")
parser.add_option("--numDates", type='int', default=300, # 2300
help="Number of dates to generate")
parser.add_option("--loadMA", default=False, action="store_true",
help="Test loading data into MaskedArrays")
parser.add_option("--loadTS", default=False, action="store_true",
help="Test loading data into timeseries objects")
parser.add_option("--timeMultDim", type='int', default=1000, dest='dim',
help="Number of rows and columns for our timed element-wise multiplication.")
parser.add_option("--timeListMult", default=False, action="store_true",
help="Test element-wise multiplication of two 2d lists")
parser.add_option("--timeNPMult", default=False, action="store_true",
help="Test element-wise multiplication of two 2d numpy ndarrays")
parser.add_option("--timeMAMult", default=False, action="store_true",
help="Test element-wise multiplication of two 2d MaskedArrays")
(options, args) = parser.parse_args()
return options
def print_timing(func):
def wrapper(*arg):
t1 = time.time()
res = func(*arg)
t2 = time.time()
print '%s took %0.3f ms' % (func.func_name, (t2-t1)*1000.0)
return res
return wrapper
def makeTwoDimMAArray(numRows, numCols):
return MA.MaskedArray(NP.array(makeTwoDimList(numRows, numCols)), mask=False)
def makeTwoDimNumpyArray(numRows, numCols):
return NP.array(makeTwoDimList(numRows, numCols))
def makeTwoDimList(numRows, numCols):
return [[i*j for j in range(numCols)] for i in range(numRows)]
@print_timing
def elementwiseMultiply(l1,l2):
numRows = len(l1)
numCols = len(l1[0])
return [[l1[i][j] * l2[i][j] for j in range(numCols)] for i in range(numRows)]
@print_timing
def elementwiseNPMultiply(a1,a2):
return a1 * a2
@print_timing
def elementwiseMAMultiply(a1,a2):
return a1 * a2
def generateFakeStockData(numTickers=4000, numDates=300):
"""A simple function that generates a random list of type [[string]].
Each sub-list contains:
[date string, ticker, daily return, daily trading volume]
The start of the stream is ragged -- some of the tickers (out of a
universe of 'numTickers' fake tickers) might not appear initially. Also, there
are periodic holidays, where there are no data. Furthermore, there are
randomly missing daily returns or daily volumes, which are
represented by empty strings. This approximates what we might get from
a well-behaved, real world CSV file.
"""
# generate a list of tickers
asciiRange = range(65,91)
tickers = []
for i, (a,b,c) in enumerate(((a,b,c) for a in asciiRange for b in asciiRange for c in asciiRange)):
if i % 7 == 0:
tickers.append(chr(a)+chr(b)+chr(c))
# if i >= 14000:
if i >= 4000:
break
# calculate some random ticker-specific characteristics
tickerVolumeMeans = {}
tickerStartOffsets = {}
for ticker in tickers:
tickerVolumeMeans[ticker] = int(1e3 + 1e5 * random.lognormvariate(1.0,1.5))
tickerStartOffsets[ticker] = int(random.lognormvariate(0,2))
# an iterator for the sub-lists. This nested loop contains a 'yield' statement.
# for dateIndex in xrange(2300):
for dateIndex in xrange(300):
date = TS.Date('B', '1/1/2000') + dateIndex
dateStr = date.strftime('%m/%d/%Y')
if dateIndex % 60 == 0:
continue # holiday
for ticker in tickers:
if dateIndex < tickerStartOffsets[ticker]:
continue # the ticker doesn't exist yet
# daily return might be missing for no particular reason
if int(random.uniform(0,5000)) < 1:
dailyReturn = None
else:
dailyReturn = 1 - random.lognormvariate(0,0.03)
# daily volume might be missing for no particular reason
if int(random.uniform(0,4000)) < 1:
dailyVolume = None
else:
volMean = tickerVolumeMeans[ticker]
dailyVolume = int(random.normalvariate(volMean, volMean / 10))
dailyReturnStr = '%0.4f' % dailyReturn if dailyReturn is not None else ''
dailyVolumeStr = '%d' % dailyVolume if dailyVolume is not None else ''
yield [dateStr, ticker, dailyReturnStr, dailyVolumeStr]
@print_timing
def loadDataIntoMaskedArrays(iter):
""" Convert an iterator produced by 'generateFakeStockData()' into MaskedArray objects.
Return a tuple, (dateStrs, tickers, dailyReturnsMA, dailyVolumesMA), where
dateStrs is a list of string representations of the dates for each row in the 2d MaskedArrays
tickers is a list of tickers that correspond to the columns of the 2d MaskedArrays
dailyReturnsMA is a MaskedArray containing the returns for all tickers on all of the dates.
dailyVolumesMA is a MaskedArray containing the volumes for all tickers on all of the dates.
"""
dateStrs = []
dailyReturns = {} # map ticker -> return (float) and mask (boolean)
dailyVolumes = {} # map ticker -> volume (int) and mask (boolean)
for dateCount, (dateStr, rowsForDate) in enumerate(IT.groupby(iter, itemgetter(0))):
# print dateStr
dateStrs.append(dateStr)
for [_, ticker, dailyReturnStr, dailyVolumeStr] in rowsForDate:
# handle ragged starts -- every ticker is not present for every date
if ticker not in dailyReturns:
for d in [dailyReturns, dailyVolumes]:
d[ticker] = {'data': [0 for i in range(dateCount)],
'mask': [True for i in range(dateCount)]}
try:
dailyReturnV = float(dailyReturnStr)
dailyReturnM = False
except ValueError:
dailyReturnV = 0
dailyReturnM = True
try:
dailyVolumeV = int(dailyVolumeStr)
dailyVolumeM = False
except ValueError:
dailyVolumeV = 0
dailyVolumeM = True
dailyReturns[ticker]['data'].append(dailyReturnV)
dailyReturns[ticker]['mask'].append(dailyReturnM)
dailyVolumes[ticker]['data'].append(dailyVolumeV)
dailyVolumes[ticker]['mask'].append(dailyVolumeM)
tickers = sorted(dailyReturns.keys())
numRows = len(dateStrs)
dailyReturnsMA = MA.MaskedArray(data=[[dailyReturns[ticker]['data'][i] for ticker in tickers] for i in range(numRows)],
mask=[[dailyReturns[ticker]['mask'][i] for ticker in tickers] for i in range(numRows)])
dailyVolumesMA = MA.MaskedArray(data=[[dailyVolumes[ticker]['data'][i] for ticker in tickers] for i in range(numRows)],
mask=[[dailyVolumes[ticker]['mask'][i] for ticker in tickers] for i in range(numRows)])
return dateStrs, tickers, dailyReturnsMA, dailyVolumesMA
@print_timing
def loadDataIntoTimeseries(iter):
""" Convert an iterator produced by 'generateFakeStockData()' into scikits.timeseries objects
Return a tuple, (tickers, dailyReturnsTSGrid, dailyVolumesTSGrid), where
tickers is a list of tickers that correspond to the columns of the 2d MaskedArrays
dailyReturnsTSGrid is a timeseries containing the returns for all tickers on all of the dates.
dailyVolumesTSGrid is a timeseries containing the volumes for all tickers on all of the dates.
"""
dailyReturns = {} # map ticker -> date (string) and return (float) and mask (boolean)
dailyVolumes = {} # map ticker -> date (string) and volume (int) and mask (boolean)
for dateStr, rowsForDate in IT.groupby(iter, itemgetter(0)):
# print dateStr
date = TS.Date('B', dateStr)
for [_, ticker, dailyReturnStr, dailyVolumeStr] in rowsForDate:
# handle ragged starts -- every ticker is not present for every date
if ticker not in dailyReturns:
for d in [dailyReturns, dailyVolumes]:
d[ticker] = {'dates': [],
'data': [],
'mask': []}
try:
dailyReturnV = float(dailyReturnStr)
dailyReturnM = False
except ValueError:
dailyReturnV = 0
dailyReturnM = True
try:
dailyVolumeV = int(dailyVolumeStr)
dailyVolumeM = False
except ValueError:
dailyVolumeV = 0
dailyVolumeM = True
dailyReturns[ticker]['dates'].append(date)
dailyReturns[ticker]['data'].append(dailyReturnV)
dailyReturns[ticker]['mask'].append(dailyReturnM)
dailyVolumes[ticker]['dates'].append(date)
dailyVolumes[ticker]['data'].append(dailyVolumeV)
dailyVolumes[ticker]['mask'].append(dailyVolumeM)
tickersR, dailyReturnsTSGrid = makeTimeseriesGrid(dailyReturns)
tickersV, dailyVolumesTSGrid = makeTimeseriesGrid(dailyVolumes)
assert(tickersR == tickersV)
return tickersR, dailyReturnsTSGrid, dailyVolumesTSGrid
def makeTimeseriesGrid(dailyDict):
"""Convert a dictionary of returns or volume data into a scikits.timeseries.time_series """
tickers = sorted(dailyDict.keys())
# create a list of timeseries, one for each ticker
dailyTSs = [TS.time_series(dates=TS.DateArray(dailyDict[ticker]['dates'], freq='B'),
data=MA.MaskedArray(data=dailyDict[ticker]['data'],
mask=dailyDict[ticker]['mask'])).fill_missing_dates()
for ticker in tickers]
# align all of the time series to fall on an identical set of dates
dailyTSs_aligned = TS.aligned(*tuple(dailyTSs)) # I wish TS.aligned() took a list instead of a variable-length argument list
# build a 2d timeseries using all of the individual timeseries objects
dailyTSGrid = TS.time_series(dates=dailyTSs_aligned[0].dates,
data=MA.column_stack([ts.series for ts in dailyTSs_aligned]))
return tickers, dailyTSGrid
def main():
options = processOptions()
if options.loadMA:
print "Load '%d' fake tickers over '%d' dates into a masked array" % (options.numTickers, options.numDates)
iter = generateFakeStockData(numTickers=options.numTickers, numDates=options.numDates)
dateStrs, tickers, dailyReturnsMA, dailyVolumesMA = loadDataIntoMaskedArrays(iter)
if options.loadTS:
print "Load '%d' fake tickers over '%d' dates into a timeseries" % (options.numTickers, options.numDates)
iter = generateFakeStockData(numTickers=options.numTickers, numDates=options.numDates)
tickers, dailyReturnsTSGrid, dailyVolumesTSGrid = loadDataIntoTimeseries(iter)
n,m = options.dim,options.dim
if options.timeListMult:
print "Do elementwise multiplication of an %d x %d two dimensional list." % (n, m)
l1 = makeTwoDimList(n,m)
l2 = makeTwoDimList(n,m)
elementwiseMultiply(l1,l2)
if options.timeNPMult:
print "Do elementwise multiplication of an %d x %d numpy array." % (n, m)
a1 = makeTwoDimNumpyArray(n,m)
a2 = makeTwoDimNumpyArray(n,m)
elementwiseNPMultiply(a1,a2)
if options.timeMAMult:
print "Do elementwise multiplication of an %d x %d masked array." % (n, m)
ma1 = makeTwoDimMAArray(n,m)
ma2 = makeTwoDimMAArray(n,m)
elementwiseMAMultiply(ma1,ma2)
return
if __name__ == "__main__":
main()