In [None]:
# default_exp utils

# utils

> Assorted low-level utilities for a flexible SEC filings scanner.

In [None]:
#hide
%load_ext autoreload
%autoreload 2
from nbdev import show_doc

In [None]:
#export

from bs4 import BeautifulSoup, Comment, Doctype, NavigableString
import datetime
import gzip
import inspect
from io import BytesIO
import json
import os
import pickle
import re
from pytz import timezone
import requests
import sys
import time
import webbrowser
import xml.etree.cElementTree as cElTree

boto3_available = True
try :
    import boto3
except :
    boto3_available = False

## Set directory for scraped data
We store all scraped data under stockDataRoot.

In [None]:
#export
stockDataRoot = os.path.expanduser(os.path.join('~','secData'))
def setStockDataRoot(loc) :
    "Set location for storing scraped stock data."
    global stockDataRoot
    stockDataRoot = loc

## Low-level download functions
We download SEC data using the requests library.
We retry a few times in case of a temporary internet glitch,
and also recognize an SEC-specific temporary outage message and raise an Exception for it
so that we can flag the problem and retry later.

In [None]:
#export

def requestUrl(url, timeout=5.0, nTries=5, returnText=False, headers=None, sleepTime=None, **kwargs) :
    """
    Downloads a URL using the requests package.
    If sleepTime is not None, sleeps for the given time to stay under max request rate limits.
    """
    for i in range(nTries) :
        try :
            if sleepTime is not None :
                time.sleep(sleepTime)
            r = requests.get(url, headers=headers, timeout=timeout, params=kwargs)
            r.raise_for_status()
            return r.text if returnText else r
        except Exception as e :
            print('*** Problem','downloading',url,'-',e,'; retrying ...')
            if i >= nTries-1 :
                print('*** UNABLE TO DOWNLOAD ***')
                raise

secUrlPref = 'https://www.sec.gov'
secRestDataPref = 'https://data.sec.gov'
secHeaders = dict(requests.utils.default_headers())
def setSecUserAgent(agentStr) :
    """
    Should be used to set user agent to your email address for requests to the SEC site,
    to help avoid throttling.
    """
    secHeaders['User-Agent'] = agentStr
    #secHeaders['Host'] = 'www.sec.gov'
    #secHeaders['Accept-Encoding'] = 'gzip, deflate'
setSecUserAgent('secscantest@secscan.com')
secSleepTime = 0.1 # sleep time after requests to stay under SEC max request rate (currently 10/sec)
sys.setrecursionlimit(2000) # some filings have deeply nested HTML

accessNoPatStr = r'\d{10}-\d+-\d+'
accessNoPat = re.compile(accessNoPatStr)
def secIndexUrl(accessNo, includePref=False) :
    "Returns the url for the index page of an SEC filing specified by accession number."
    return ((secUrlPref if includePref else '')
            + '/Archives/edgar/data/'+accessNo.replace('-','')
            +'/'+accessNo+'-index.htm')

# from bs4.element import Comment
# def tag_visible(element):
#     if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]'] :
#         return False
#     if isinstance(element, Comment):
#         return False
#     return True
# def getCombSoupText(tag) :
#     "Get the combined text from a BeautifulSoup tag."
#     texts = tag.findAll(text=True)
#     texts = filter(tag_visible,texts)
#     return u" ".join(t.strip() for t in texts)
spacesPat = re.compile(r'\s+')
# def getCombSoupText(tag) :
#     "Get the combined text from a BeautifulSoup tag."
#     return spacesPat.sub(" "," ".join(tag.stripped_strings))
def appendSpace(resL) :
    if resL[-1] != ' ' :
        resL.append(' ')
tagsWithLeftSpace = tagsWithRightSpace = {'p','br','div','table','tr','td','li','pre','code'}
def getCombTextRec(soup, resL) :
    if isinstance(soup,Comment) or isinstance(soup,Doctype) :
        return
    if isinstance(soup,NavigableString) :
        s = soup.string.lstrip()
        if s != soup.string :
            appendSpace(resL)
        ss = s.rstrip()
        if ss != '' :
            resL.append(ss)
        if ss != s :
            resL.append(' ')
        return
    if soup.name in tagsWithLeftSpace :
        appendSpace(resL)
    for c in soup.children :
        getCombTextRec(c,resL)
    if soup.name in tagsWithRightSpace :
        appendSpace(resL)
def getCombSoupText(soup) :
    resL = [' ']
    getCombTextRec(soup,resL)
    return spacesPat.sub(" ",''.join(resL)).strip()

def prTree(soup, level=0) :
    if isinstance(soup,NavigableString) :
        print(level*'|'+('COMMENT' if isinstance(soup,Comment)
                         else ('DOCTYPE' if isinstance(soup,Doctype) else 'TEXT')),
              repr(soup.string))
    else :
        print(level*'|'+'TAG'+repr(soup.name))
        for c in soup.children :
            prTree(c,level+1)
def prAllTagNames(soup) :
    print(sorted(set(tag.name for tag in soup.descendants)))

pageUnavailablePat = re.compile('page is temporarily unavailable',re.IGNORECASE)
def downloadSecUrl(secSubUrlOrAccessNo, toFormat='text', sleepTime=0.1, restData=False) :
    """
    Downloads a page from the SEC site. The page can be specified by
    a sub-URL (ex. /cgi-bin/browse-edgar?CIK=0000716314&owner=exclude),
    or just by an accession number (ex. 0001193125-21-181366), in which
    case the index page for that filing is downloaded.

    Optionally parses the page contents:
    - toFormat=='soup' - parses to a BeautifulSoup object
    - toFormat=='souptext' - parses to a BeautifulSoup object, then gets combined text
    - toFormat=='json' - parses using json.loads
    - toFormat=='xml' - parses using xml.etree.cElementTree.fromstring

    SEC-specific behavior:

    If sleepTime is not None, sleeps for the given time to stay under
    the SEC site's maximum request rate (currently 10 requests/second).

    Checks for an SEC-specific temporary outage message, and raises
    an Exception if it's detected, so that we can detect the problem
    and retry the download later.
    """
    if accessNoPat.match(secSubUrlOrAccessNo) :
        secSubUrl = secIndexUrl(secSubUrlOrAccessNo)
    else :
        secSubUrl = secSubUrlOrAccessNo
        if secSubUrl.startswith('/ix?') :
            secSubUrl = secSubUrl[secSubUrl.index('/',1):]
    fullUrl = (secRestDataPref if restData else secUrlPref) + secSubUrl
    urlContents = requestUrl(fullUrl, returnText=True, headers=secHeaders, sleepTime=sleepTime)
    if pageUnavailablePat.search(urlContents) :
        raise Exception('temporary SEC outage')
    if toFormat=='soup' :
        return BeautifulSoup(urlContents,'html.parser')
    elif toFormat=='souptext' :
        return getCombSoupText(BeautifulSoup(urlContents,'html.parser'))
    elif toFormat=='json' :
        return json.loads(urlContents)
    elif toFormat == 'xml' :
        return cElTree.fromstring(urlContents)
    else :
        return urlContents

In [None]:
# uc = requestUrl("https://www.sec.gov/Archives/edgar/data/1165002/000116500221000068/a2q21earningsrelease.htm",
#                 returnText=True, headers=secHeaders)
# prTree(BeautifulSoup(uc,'html.parser'))
# getCombSoupText(BeautifulSoup(uc,'html.parser'))

Test downloading from the SEC website:

In [None]:
t = downloadSecUrl('')
assert 'securities and exchange' in t.lower(), "SEC main page download"

In [None]:
accessNo = '0000909108-21-000051'
assert accessNoPat.match(accessNo) and not accessNoPat.match(accessNo[1:])

In [None]:
print('Accession number',accessNo)
s = downloadSecUrl(accessNo, toFormat='soup')
s = s.find('span','companyName')
print('Company name HTML:',s)
companyName = getCombSoupText(s)
print('Company name text:',companyName)
assert companyName.lower().startswith('diamond hill'),'Parsing company name from filing index.htm'

Accession number 0000909108-21-000051
Company name HTML: <span class="companyName">DIAMOND HILL INVESTMENT GROUP INC (Filer)
 <acronym title="Central Index Key">CIK</acronym>: <a href="/cgi-bin/browse-edgar?CIK=0000909108&amp;action=getcompany">0000909108 (see all company filings)</a></span>
Company name text: DIAMOND HILL INVESTMENT GROUP INC (Filer) CIK: 0000909108 (see all company filings)


## `delegates` decorator

Cool/useful decorator to avoid having to repeat lists of optional keyword arguments in function signatures and docstrs.

Adapted from the original design by Jeremy Howard in https://www.fast.ai/2019/08/06/delegation/

In [None]:
# export

def delegates(*toFuncs, keepKwargs=False):
    """
    Decorator to specify that a function delegates to one or more delegated functions.
    This will:

    - replace `**kwargs` in the delegating function's signature with the combined
      keyword arguments from the delegated functions, so that these keyword arguments
      are visible using autocomplete in a Jupyter environment

    - add the docstrs for the delegated functions to the end of the delegating function's
      docstr, so the usage documentation for the delegated functions is also visible.
    """
    def _decorator(fromFunc):
        sigFrom = inspect.signature(fromFunc)
        # print(sigFrom)
        sigFromDict = dict(sigFrom.parameters)
        kwargsParam = sigFromDict.pop('kwargs')
        delegatedDict = {}
        docStrs = []
        if fromFunc.__doc__ is not None :
            docStrs.append(fromFunc.__doc__)
        for toFunc in toFuncs :
            argL = []
            for name,param in inspect.signature(toFunc).parameters.items() :
                if param.default!=inspect.Parameter.empty and name not in sigFromDict :
                    delegatedDict[name] = param.replace(kind=inspect.Parameter.KEYWORD_ONLY)
                    argL.append(f'{name}={param.default}')
            docStrs.append('---')
            docStrs.append(f'{toFunc.__qualname__} arguments: ' + ', '.join(argL))
            if toFunc.__doc__ is not None :
                docStrs.append(toFunc.__doc__)
        sigFromDict.update(delegatedDict)
        if keepKwargs:
            sigFromDict['kwargs'] = kwargsParam
        fromFunc.__signature__ = sigFrom.replace(parameters=sigFromDict.values())
        # print(fromFunc.__signature__)
        fromFunc.__doc__ = '\n'.join(docStrs)
        return fromFunc
    return _decorator

def callDelegated(toFunc, kwargs, *args, **extraKwargs) :
    """
    Call a delegated function. This needs to be used from within a delegating function
    if more than one function was delegated to, in order to select the optional arguments
    from kwargs that apply to the delegated function.
    """
    # print('callDelegate',toFunc, kwargs, args, extraKwargs)
    delegatedKwargs = {}
    for name,param in inspect.signature(toFunc).parameters.items() :
        if param.default != inspect.Parameter.empty :
            delegatedKwargs[name] = kwargs.get(name, param.default)
    delegatedKwargs.update(extraKwargs)
    # print(delegatedKwargs)
    return toFunc(*args,**delegatedKwargs)

def checkDelegated(*toFuncs, **kwargs) :
    """
    Raises an exception if kwargs contains any unexpected keyword arguments not included
    in any of the delegated functions toFuncs.
    """
    allKws = set()
    for toFunc in toFuncs :
        allKws.update(name for name,param in inspect.signature(toFunc).parameters.items()
                      if param.default != inspect.Parameter.empty)
    for name,val in kwargs.items() :
        if name not in allKws :
            raise TypeError(f'unexpected keyword argument {name}={val}')

In [None]:
# test delegating to one function:

def aaa(bbb, ccc=20, ddd=30) :
    """
    doc for aaa
    """
    return bbb + 2*ccc + 3*ddd

@delegates(aaa)
def test(a, **kwargs) :
    """
    doc for test
    """
    return a + aaa(3*a, **kwargs)

assert (test.__doc__.split()==['doc', 'for', 'test',
                               '---', 'aaa', 'arguments:', 'ccc=20,', 'ddd=30', 'doc', 'for', 'aaa']
        and test(99) == 99 + 3*99 + 2*20 + 3*30
        and test(88, ccc=22, ddd=33) == 88 + 3*88 + 2*22 + 3*33
       )

In [None]:
# test delegating to two functions:

def xxx(yyy, zzz, www=100, uuu=200, vvv=300) :
    """
    doc
    for xxx
    """
    return yyy + 2*zzz + 3*www + 4*uuu + 5*vvv

@delegates(aaa,xxx)
def test2(a, **kwargs) :
    """
    doc for test2
    """
    return (callDelegated(aaa, kwargs, a),
            callDelegated(xxx, kwargs, a, a*a),
            callDelegated(xxx, kwargs, a, a*a, uuu=999))

assert (test2.__doc__.split() == ['doc', 'for', 'test2',
                                  '---', 'aaa', 'arguments:', 'ccc=20,', 'ddd=30', 'doc', 'for', 'aaa',
                                  '---', 'xxx', 'arguments:', 'www=100,', 'uuu=200,', 'vvv=300',
                                  'doc', 'for', 'xxx']
        and test2(77) == (77 + 2*20 + 3*30,
                          77 + 2*77*77 + 3*100 + 4*200 + 5*300,
                          77 + 2*77*77 + 3*100 + 4*999 + 5*300)
        and test2(77, uuu=654, vvv=876, ccc=55) == (77 + 2*55 + 3*30,
                          77 + 2*77*77 + 3*100 + 4*654 + 5*876,
                          77 + 2*77*77 + 3*100 + 4*999 + 5*876)
       )

In [None]:
# test using with classes:

class a() :
    def __init__(self, aa=10, bb=20) :
        print(f'a init aa={aa} bb={bb}')
        self.x = aa + 2*bb

class aa(a) :
    @delegates(a.__init__)
    def __init__(self, d, **kwargs) :
        print(f'aa init d={d} kwargs={kwargs}')
        super().__init__(**kwargs)
        self.x += 3*d
    @delegates(xxx) 
    def mm(self,**kwargs) :
        return self.x + aaa(1000,**kwargs)

testaa1 = aa(99)
testaa2 = aa(99, bb=100)
assert (testaa1.x == 3*99 + 10 + 2*20
        and testaa2.x == 3*99 + 10 + 2*100
        and testaa2.mm() == testaa2.x + 1000 + 2*20 + 3*30
        and testaa2.mm(ccc=50) == testaa2.x + 1000 + 2*50 + 3*30
       )

aa init d=99 kwargs={}
a init aa=10 bb=20
aa init d=99 kwargs={'bb': 100}
a init aa=10 bb=100


## Low-level functions for storing scraped data
We store scraped data in pickled format,
either storing an object in a single pickled file
or storing a dict by saving one file per key
(for example, one file per date).
We can optionally use gzip compression (smaller files, but slower to read).

In-memory pickling:

In [None]:
#export

def compressGZipBytes(b) :
    "Compress a byte string in-memory using gzip."
    out = BytesIO()
    with gzip.GzipFile(fileobj=out, mode="w") as f:
        f.write(b)
    return out.getvalue()

def decompressGZipBytes(b) :
    "Decompress a byte string in-memory using gzip."
    inp = BytesIO(b)
    with gzip.GzipFile(fileobj=inp, mode="r") as f:
        return f.read()

@delegates(pickle.dumps)
def pickleToBytes(ob, use_gzip=False, **kwargs) :
    "Pickle an object in-memory, optionally using gzip compression."
    b = pickle.dumps(ob, **kwargs)
    if use_gzip :
        b = compressGZipBytes(b)
    return b

@delegates(pickle.loads)
def pickleFromBytes(b, use_gzip=False, **kwargs) :
    "Unpickle an object in-memory, optionally using gzip compression."
    if use_gzip :
        b = decompressGZipBytes(b)
    return pickle.loads(b, **kwargs)

Test in-memory pickling:

In [None]:
import random
rng = random.Random(42)
test_rand = dict((f'r{i}', rng.random()) for i in range(10))
b = pickleToBytes(test_rand)
assert test_rand == pickleFromBytes(b), 'pickling an object to bytes (no compression)'
b = pickleToBytes(test_rand, use_gzip=True)
assert test_rand == pickleFromBytes(b, use_gzip=True), 'pickling an object to bytes (gzip compression)'
b = pickleToBytes(test_rand, use_gzip=True, protocol=2)
assert test_rand == pickleFromBytes(b, use_gzip=True), 'pickling an object to bytes (gzip compression)'

Pickled data storage to single files:

In [None]:
#export

@delegates(pickleToBytes)
def pickSave(fpath, ob, **kwargs) :
    "Save a pickled object to a file, optionally using gzip compression."
    with open(fpath, 'wb') as f :
        f.write(pickleToBytes(ob, **kwargs))

@delegates(pickleFromBytes)
def pickLoad(fpath, **kwargs) :
    "Load a pickled object from a file, optionally using gzip compression."
    with open(fpath, 'rb') as f :
        return pickleFromBytes(f.read(), **kwargs)

@delegates(pickLoad)
def pickLoadIfPath(path_or_ob, **kwargs) :
    """
    If given a path, loads a pickled object from it; otherwise returns
    its argument unchanged (assumes it's an already loaded object).
    """
    if isinstance(path_or_ob,str) :
        return pickLoad(path_or_ob, **kwargs)
    else :
        return path_or_ob

Test pickled data storage to single files:

In [None]:
pickSave('test.pkl', test_rand)
assert test_rand == pickLoad('test.pkl'), 'pickling an object to a file (no compression)'
pickSave('test.pkl', test_rand, use_gzip=True)
assert test_rand == pickLoad('test.pkl', use_gzip=True), 'pickling an object to a file (gzip compression)'
time.sleep(1)
os.unlink('test.pkl')

Pickled data storage to S3:

In [None]:
#export

@delegates(pickleToBytes)
def pickSaveToS3(bucket, key, ob, make_public=False, s3=None, **kwargs) :
    "Save a pickled object to an S3 bucket, optionally using gzip compression."
    if s3 is None : s3 = boto3.client('s3')
    s3Args = dict(Bucket=bucket, Key=key, Body=pickleToBytes(ob, **kwargs))
    if make_public :
        s3Args['ACL'] = 'public-read'
    s3.put_object(**s3Args)

@delegates(pickleFromBytes)
def pickLoadFromS3(bucket, key, s3=None, **kwargs) :
    "Load a pickled object from an S3 bucket, optionally using gzip compression."
    if s3 is None : s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=bucket, Key=key)
    return pickleFromBytes(obj['Body'].read(), **kwargs)

@delegates(pickleFromBytes)
def pickLoadFromS3Public(bucket, key, **kwargs) :
    s3PublicUrl = 'https://'+bucket+'.s3.amazonaws.com/'+key
    return pickleFromBytes(requestUrl(s3PublicUrl).content, **kwargs)

Testing pickled data storage to S3:

In [None]:
def testS3Pickle(bucket, key, ob) :
    if not boto3_available :
        print('boto3 not available, skipping test')
        return True
    s3 = boto3.client('s3')
    s3.delete_object(Bucket=bucket, Key=key)
    pickSaveToS3(bucket, key, ob)
    if ob != pickLoadFromS3(bucket, key) :
        return False
    pickSaveToS3(bucket, key, ob, use_gzip=True)
    if ob != pickLoadFromS3(bucket, key, use_gzip=True) :
        return False
    return True

# change thenycentral to your S3 bucket name and set up boto3 to run the test
assert testS3Pickle('thenycentral', 'test_rand.pkl', test_rand), 'pickling an object to S3'

boto3 not available, skipping test


Pickled data storage under directory:

In [None]:
#export

@delegates(pickSave)
def savePklToDir(toDir, fName, ob, **kwargs) :
    """
    Saves a pickled object to a file under a directory, optionally using gzip compression.
    Creates the directory if it doesn't exist.
    """
    if not os.path.exists(toDir) :
        os.makedirs(toDir)
    pickSave(os.path.join(toDir, fName), ob, **kwargs)

@delegates(pickLoad)
def loadPklFromDir(fromDir, fName, defaultVal, **kwargs) :
    """
    Load a pickled object from a file under a directory, optionally using gzip compression.
    Returns a default value if the file doesn't exist.
    """
    fpath = os.path.join(fromDir, fName)
    if os.path.exists(fpath) :
        return pickLoad(fpath, **kwargs)
    else :
        return defaultVal

Test pickled data storage under directory:

In [None]:
savePklToDir('testdirpkl','test.pkl', test_rand)
assert test_rand == loadPklFromDir('testdirpkl','test.pkl',None)
time.sleep(1)
os.unlink(os.path.join('testdirpkl','test.pkl'))
os.rmdir('testdirpkl')

Pickled dict storage split by key:

In [None]:
#export

@delegates(pickSave)
def saveSplitPklToDir(m, toDir, fSuff='m.pkl', dirtySet=None, **kwargs) :
    """
    Saves a dict with str keys to a separate file for each key.
    If dirtySet is True, saves all keys.
    If dirtySet is None (default), saves only keys that don't yet have a file saved.
    Otherwise, also saves keys k in dirtySet.
    """
    if not os.path.exists(toDir) :
        os.makedirs(toDir)
    for k in sorted(m.keys()) :
        fPath = os.path.join(toDir, k+fSuff)
        if dirtySet is True :
            needToSave = True
        else :
            needToSave = not os.path.exists(fPath)
            if dirtySet is not None :
                needToSave = needToSave or (k in dirtySet)
        if needToSave :
            pickSave(fPath, m[k], **kwargs)

@delegates(pickLoad)
def loadSplitPklFromDir(fromDir, startK=None, endK=None, fSuff='m.pkl', **kwargs) :
    """
    Loads a pickled dict with str keys stored with a separate file for each key,
    optionally restricting to keys in [startK .. endK)
    """
    m = {}
    if not os.path.exists(fromDir) :
        return m
    fNames = sorted(fName for fName in os.listdir(fromDir)
                    if fName.endswith(fSuff))
    for fName in fNames :
        fPref = fName[:-len(fSuff)]
        if ((startK is not None and fPref<startK)
                or (endK is not None and endK<=fPref)) :
            continue
        m[fPref] = pickLoad(os.path.join(fromDir,fName), **kwargs)
    return m

Test pickled dict storage split by key:

In [None]:
saveSplitPklToDir(test_rand, 'testsplitpkl')
assert test_rand == loadSplitPklFromDir('testsplitpkl')
test_sub = dict((k,v) for k,v in test_rand.items() if 'r3'<=k<'r7')
assert test_sub == loadSplitPklFromDir('testsplitpkl',startK='r3',endK='r7')
time.sleep(1)
for k in test_rand.keys() :
    os.unlink(os.path.join('testsplitpkl',k+'m.pkl'))
os.rmdir('testsplitpkl')

## YYYYMMDD Date strings
A few functions for working date string in the format YYYYMMDD, as used in some SEC URLs.

In [None]:
#export

def addMissingOnesF(dateStr) :
    if len(dateStr) == 4 :
        return dateStr + '0101'
    if len(dateStr) == 6 :
        return dateStr + '01'
    return dateStr

def toDateStr(d=None, addMissingOnes=False) :
    """
    Converts date object or ISO format date string to YYYYMMDD format string;
    leaves YYYYMMDD format strings unchanged;
    None -> today.
    """
    if isinstance(d,str) :
        dateStr = d
    else :
        if d is None :
            d = curEasternUSTime()
        elif isinstance(d,int) :
            d = curEasternUSTime() + datetime.timedelta(d)
        dateStr = d.isoformat()[:10]
    dateStr = dateStr.replace('-','').replace('/','')
    if addMissingOnes :
        dateStr = addMissingOnesF(dateStr)
    return dateStr

dateStr8Pat = re.compile(r"(\d\d\d\d)(\d\d)(\d\d)$")
def toDate(d=None, addMissingOnes=False) :
    """
    Converts date string in ISO or YYYYMMDD format to date object;
    leaves date objects unchanged;
    None -> today.
    """
    if isinstance(d,str) :
        dateStr = d.replace('-','').replace('/','')
        if addMissingOnes :
            dateStr = addMissingOnesF(dateStr)
        m = dateStr8Pat.match(dateStr)
        if m is None :
            raise Exception('invalid date str "'+d+'"')
        return datetime.date(int(m.group(1)),int(m.group(2)),int(m.group(3)))
    if d is None :
        return curEasternUSTime()
    if isinstance(d,int) :
        return curEasternUSTime() + datetime.timedelta(d)
    return d

def isWeekend(d) :
    "Says if date string or date object is on a weekend (Saturday or Sunday)."
    return toDate(d).weekday() >= 5

def dateStrsBetween(d1,d2=None,excludeWeekends=False) :
    """
    Returns a list of date strings in YYYYMMDD format from d1 (inclusive)
    to d2 (exclusive), optionally excluding weekends.
    """
    d1 = toDate(d1)
    d2Str = toDateStr(d2)
    res = []
    while True :
        d1Str = toDateStr(d1)
        if d1Str >= d2Str :
            break
        if not (excludeWeekends and isWeekend(d1)) :
            res.append(d1Str)
        d1 = d1 + datetime.timedelta(1)
    return res

def formatDateStr(dStr,sep='-') :
    "Convert YYYYMMDD format date string to YYYY-MM-DD."
    return sep.join((dStr[:4],dStr[4:6],dStr[6:8]))

Test YYYYMMDD date string functions:

In [None]:
assert toDateStr('2022',True)=='20220101'
assert dateStrsBetween('20201230','20210103')==['20201230', '20201231', '20210101', '20210102']
assert dateStrsBetween('20201231','20210106',excludeWeekends=True)==['20201231', '20210101', '20210104', '20210105']
assert formatDateStr('20200630')=='2020-06-30'
assert formatDateStr('20200630','/')=='2020/06/30'
assert (isWeekend('20210605'),isWeekend('20210606'),isWeekend('20210607')) == (True,True,False)

## Get current Eastern US time
This is used to control when to check for SEC filings.

In [None]:
#export

easternUSTimeZone = timezone('US/Eastern')
def curEasternUSTime() :
    return datetime.datetime.now(easternUSTimeZone)

In [None]:
print(curEasternUSTime().isoformat())

2021-06-19T11:36:44.155584-04:00


## Sanitize text
Clean partial text scraped from SEC filings so it can be included in HTML.

In [None]:
#export

def sanitizeText(s) :
    if '&' in s[-10:] :
        s = s[:s.rindex('&')]
    return s

## Some functions for debugging

In [None]:
#export

def secBrowse(accessNo) :
    "Open the index page of an SEC filing specified by accession number in a web browser."
    webbrowser.open_new_tab(secIndexUrl(accessNo,True))

def printSamp(m,n=10) :
    """
    Prints a sample of n items from object m , where m is a list or dict;
    for other objects just prints the whole thing.
    """
    if isinstance(m,list) :
        for i,item in enumerate(m[:n]) :
            print(i,end=' ')
            printSamp(item,n)
    elif isinstance(m,dict) :
        for k in m.keys()[:n] :
            print(k,end=' ')
            printSamp(m[k],n)
    else :
        print(m)

def printErrInfoOrAccessNo(msg,infoOrAccessNo) :
    print(msg,end=' ')
    if isinstance(infoOrAccessNo,str) and accessNoPat.match(infoOrAccessNo) :
        print(secIndexUrl(infoOrAccessNo,True))
    else :
        print(repr(infOrAccessNo))

In [None]:
#hide
# uncomment and run to regenerate all library Python files
# from nbdev.export import notebook2script; notebook2script()