Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
257 lines (202 sloc) 5.72 KB
#!/usr/bin/env python
from __future__ import with_statement
import os
import pickle
import contextlib
import itertools
import codecs
import logging
from xml.sax import saxutils
import csv
try:
import cStringIO as StringIO
except ImportError:
import StringIO
@contextlib.contextmanager
def change_directory(directory):
previousDirectory = os.getcwd()
os.chdir(directory)
currentDirectory = os.getcwd()
try:
yield previousDirectory, currentDirectory
finally:
os.chdir(previousDirectory)
@contextlib.contextmanager
def pickled(filename):
"""
Here is an example usage:
with pickled("foo.db") as p:
p("users", list).append(["srid", "passwd", 23])
"""
if os.path.isfile(filename):
data = pickle.load(open(filename))
else:
data = {}
def getter(item, factory):
if item in data:
return data[item]
else:
data[item] = factory()
return data[item]
yield getter
pickle.dump(data, open(filename, "w"))
@contextlib.contextmanager
def redirect(object_, attr, value):
"""
>>> import sys
... with redirect(sys, 'stdout', open('stdout', 'w')):
... print "hello"
...
>>> print "we're back"
we're back
"""
orig = getattr(object_, attr)
setattr(object_, attr, value)
try:
yield
finally:
setattr(object_, attr, orig)
def pathsplit(path):
"""
>>> pathsplit("/a/b/c")
['', 'a', 'b', 'c']
>>> pathsplit("./plugins/builtins.ini")
['.', 'plugins', 'builtins.ini']
"""
pathParts = path.split(os.path.sep)
return pathParts
def commonpath(l1, l2, common=None):
"""
>>> commonpath(pathsplit('/a/b/c/d'), pathsplit('/a/b/c1/d1'))
(['', 'a', 'b'], ['c', 'd'], ['c1', 'd1'])
>>> commonpath(pathsplit("./plugins/"), pathsplit("./plugins/builtins.ini"))
(['.', 'plugins'], [''], ['builtins.ini'])
>>> commonpath(pathsplit("./plugins/builtins"), pathsplit("./plugins"))
(['.', 'plugins'], ['builtins'], [])
"""
if common is None:
common = []
if l1 == l2:
return l1, [], []
for i, (leftDir, rightDir) in enumerate(zip(l1, l2)):
if leftDir != rightDir:
return l1[0:i], l1[i:], l2[i:]
else:
if leftDir == rightDir:
i += 1
return l1[0:i], l1[i:], l2[i:]
def relpath(p1, p2):
"""
>>> relpath('/', '/')
'./'
>>> relpath('/a/b/c/d', '/')
'../../../../'
>>> relpath('/a/b/c/d', '/a/b/c1/d1')
'../../c1/d1'
>>> relpath('/a/b/c/d', '/a/b/c1/d1/')
'../../c1/d1'
>>> relpath("./plugins/builtins", "./plugins")
'../'
>>> relpath("./plugins/", "./plugins/builtins.ini")
'builtins.ini'
"""
sourcePath = os.path.normpath(p1)
destPath = os.path.normpath(p2)
(common, sourceOnly, destOnly) = commonpath(pathsplit(sourcePath), pathsplit(destPath))
if len(sourceOnly) or len(destOnly):
relParts = itertools.chain(
(('..' + os.sep) * len(sourceOnly), ),
destOnly,
)
return os.path.join(*relParts)
else:
return "."+os.sep
class UTF8Recoder(object):
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8
"""
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
return self.reader.next().encode("utf-8")
class UnicodeReader(object):
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def next(self):
row = self.reader.next()
return [unicode(s, "utf-8") for s in row]
def __iter__(self):
return self
class UnicodeWriter(object):
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = StringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow([s.encode("utf-8") for s in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
# csv.py doesn't do Unicode; encode temporarily as UTF-8:
csv_reader = csv.reader(utf_8_encoder(unicode_csv_data),
dialect=dialect, **kwargs)
for row in csv_reader:
# decode UTF-8 back to Unicode, cell by cell:
yield [unicode(cell, 'utf-8') for cell in row]
def utf_8_encoder(unicode_csv_data):
for line in unicode_csv_data:
yield line.encode('utf-8')
_UNESCAPE_ENTITIES = {
""": '"',
" ": " ",
"'": "'",
}
_ESCAPE_ENTITIES = dict((v, k) for (v, k) in zip(_UNESCAPE_ENTITIES.itervalues(), _UNESCAPE_ENTITIES.iterkeys()))
del _ESCAPE_ENTITIES[" "]
def unescape(text):
plain = saxutils.unescape(text, _UNESCAPE_ENTITIES)
return plain
def escape(text):
fancy = saxutils.escape(text, _ESCAPE_ENTITIES)
return fancy
class ErrorLogHandler(logging.Handler):
def __init__(self, errorLog, level = logging.NOTSET):
logging.Handler.__init__(self, level = level)
self._errorLog = errorLog
def emit(self, record):
try:
# We don't want to write a custom formatter just to avoid tracebacks
exc, exc_text = record.exc_info, record.exc_text
record.exc_info, record.exc_text = None, None
msg = self.format(record)
record.exc_info, record.exc_text = exc, exc_text
self._errorLog.push_message(msg, record.levelno)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def createLock(self):
self.lock = None