Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 515 lines (414 sloc) 13.3 KB
#!/usr/bin/python
# In progress proof of concept for distributed parallel bzip2 compression.
# Breaks incoming data stream into blocks, sends them out for separate
# compression on multiple threads.
#
# Brion Vibber <brion@pobox.com>
# 2006-05-12
# TODO:
# Accept file input/output, behavior like bzip2
import bz2
import ConfigParser
import getopt
import os
import popen2
import random
import socket
import struct
import sys
import thread
import time
from thread import start_new_thread
import dbzutil
import DistBits
from sigcheck import findBzTrail
def processorCount():
try:
# Works on Linux
return os.sysconf("SC_NPROCESSORS_ONLN")
except:
try:
# Works on Mac OS X
(outstream, instream) = popen2.popen2("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null")
output = outstream.read()
instream.close()
outstream.close()
return int(output)
except:
# Give up
return 1
class Compressor(object):
def __init__(self, args):
self.inputStream = sys.stdin
self.outputStream = sys.stdout
self.blockSize = 900000 # 900k default blocksize
self.threads = processorCount()
self.remotes = []
self.verbosity = 0
self.blockSize100k = 9
self.compressing = True
self.configFiles = [
os.path.join(os.path.dirname(sys.argv[0]), "dbzip2.conf"),
"/etc/dbzip2.conf",
os.path.join(os.getenv("HOME"), ".dbzip2.conf")]
self.timeoutMin = 1 # Initial timeout
self.timeoutMax = 64 # Exponential backoff up to X seconds
self.crc = 0L
self.blocksRead = 0
self.blocksCompressed = 0
self.blocksWritten = 0
self.bytesRead = 0L
self.bytesWritten = 0L
self.runningThreads = 0
self.workingThreads = 0
self.compressors = []
self.inputQueue = [] # blocks to be compressed
self.outputQueue = [] # buffers to be written
self.done = False
self.threadLock = thread.allocate_lock()
self.processConfigFiles()
self.processArgs(args)
def processArgs(self, args):
(options, remainder) = getopt.getopt(args, "123456789dp:r:v")
for (opt, val) in options:
if opt >= "-1" and opt <= "-9":
self.blockSize100k = int(opt[1])
elif opt == "-d":
self.compressing = False
elif opt == "-r":
self.remotes.append(self.splitHost(val))
elif opt == "-p":
self.threads = int(val)
elif opt == "-v":
self.verbosity += 1
def processConfigFiles(self):
conf = ConfigParser.SafeConfigParser()
conf.read(self.configFiles)
if conf.has_option("local", "threads"):
self.threads = conf.getint("local", "threads")
if conf.has_section("remote"):
for (name, val) in conf.items("remote"):
self.remotes.append(self.splitHost(val))
if conf.has_option("debug", "verbosity"):
self.verbosity = conf.getint("debug", "verbosity")
def splitHost(self, val):
if ":" in val:
(host, port) = val.split(":")
return (host, int(port))
else:
return (val, 16986)
def debug(self, level, text):
if self.verbosity >= level:
sys.stderr.write(text + "\n")
def error(self, text):
self.debug(0, text)
def run(self):
"""Start up the threads and goooo!"""
for i in range(0, self.threads):
self.compressors.append(LocalCompressor(self.blockSize100k))
for addr in self.remotes:
self.compressors.append(RemoteCompressor(addr, self.blockSize100k))
assert len(self.compressors) >= 1
start_new_thread(self.readerThread, ())
for compressor in self.compressors:
start_new_thread(self.compressorThread, (compressor,))
if self.threads == 0:
# If we're only using remote threads, we risk hanging if all
# remote servers go offline. Prepare a local thread to run
# on an emergency basis.
start_new_thread(self.compressorThread,
(LocalCompressor(self.blockSize100k), True))
self.writerThread()
def sleep(self):
"""Wait a short time when out of data."""
time.sleep(0.01)
def lock(self):
self.threadLock.acquire()
assert self.threadLock.locked()
def unlock(self):
self.threadLock.release()
def incRunning(self, amount=1):
self.lock()
self.runningThreads += amount
self.unlock()
def incWorking(self, amount=1):
self.lock()
self.workingThreads += amount
self.unlock()
def readerThread(self):
"""Producer thread: run through the file handing out blocks."""
self.debug(2, "readerThread: starting!")
if self.compressing:
dbzutil.readblock(self.inputStream, self.dispatch, self.blockSize100k)
else:
# hack hack hack!
# should work on single-block files :)
inchunk = self.inputStream.read(1000000)
self.dispatch(inchunk)
self.done = True
self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead)
def nextBlock(self):
buffer = self.inputStream.read(self.blockSize - 1000) # some headroom for RLE fail
self.bytesRead += len(buffer)
self.debug(3, "nextBlock: %d" % len(buffer))
return buffer
def ready(self):
"""Check if we have some free compressors. No sense filling up RAM."""
return len(self.inputQueue) < len(self.compressors)
# Queue management
def dispatch(self, block):
"""Queue a block of data for remote compression."""
while not self.ready():
self.debug(4, "readerThread: full at %d; waiting" % len(self.inputQueue))
self.sleep()
self.lock()
self.blocksRead += 1
self.bytesRead += len(block)
self.debug(2, "readerThread: dispatching block %d" % self.blocksRead)
buffer = QueuedBuffer(self.blocksRead, block, self.compressing)
self.inputQueue.append(buffer) # To the compressor threads
self.outputQueue.append(buffer) # To the writer thread, in order!
self.unlock()
def dequeueInput(self):
"""Fetch the next available block for compression."""
assert self.threadLock.locked()
if len(self.inputQueue):
return self.inputQueue.pop(0)
else:
return None
def dequeueOutput(self):
"""Fetch the next completed block for writing."""
assert self.threadLock.locked()
if len(self.outputQueue) and self.outputQueue[0].ready():
return self.outputQueue.pop(0)
else:
return None
def writerThread(self):
"""Consumer thread: as we receive compressed blocks from the
distributed compressors, write them to the output file.
Currently only writes blocks in order."""
self.debug(2, "writerThread: starting")
startTime = time.time()
if self.compressing:
self.bitStream = dbzutil.Bitstream(self.outputStream)
self.writeHeader()
while not (self.done and self.blocksWritten == self.blocksRead):
self.lock()
buffer = self.dequeueOutput()
self.unlock()
if buffer:
self.debug(4, "writerThread: wtf")
self.writeBuffer(buffer)
else:
self.debug(4, "writerThread: sleeping")
self.sleep()
if self.compressing:
self.writeTrailer()
self.bitStream.flush()
# Wait for all compressor threads to complete out.
while self.runningThreads > 0:
self.sleep()
delta = time.time() - startTime
megabyte = 1024.0 * 1024.0
rateIn = (float(self.bytesRead) / megabyte) / delta
rateOut = (float(self.bytesWritten) / megabyte) / delta
self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
self.blocksWritten, delta, rateIn, rateOut))
def writeBuffer(self, buffer):
"""Write a buffer to the file. Currently requires that buffers
be processed in streaming order."""
output = buffer.output
self.blocksWritten += 1
self.bytesWritten += len(output)
self.debug(2, "writeBuffer: writing block %d (%d blocks, %d bytes)" %
(buffer.index, self.blocksWritten, self.bytesWritten))
assert buffer.output is not None
assert buffer.index == self.blocksWritten
if self.compressing:
(offset, overflow, crc) = findBzTrail(buffer.output)
self.debug(2, "writeBuffer: block stream crc %08x, offset %d bits" %
(crc, offset))
# Write out the block, with the head and tail cropped off
nbits = (len(buffer.output) - 14) * 8
if offset:
# Adjust for non-aligned footer
nbits = nbits - 8 + offset
self.bitStream.write(buffer.output[4:-10], nbits)
# Assuming there's only one output block per input block
# This had better hold :D
blockCrc = struct.unpack(">L", buffer.output[10:14])[0]
self.crc = ((self.crc << 1) & 0xffffffffL | self.crc >> 31) ^ blockCrc
else:
self.outputStream.write(buffer.output)
def writeHeader(self):
self.debug(4, "writing file header")
# hardcoded 900k blocksize
self.bitStream.write("BZh" + str(self.blockSize100k))
def writeTrailer(self):
self.debug(4, "writing file trailer, combined CRC %08x" % self.crc)
# Stream-end magic number:
self.bitStream.write("\x17\x72\x45\x38\x50\x90")
# 32-bit combined CRC
self.bitStream.write(struct.pack(">L", self.crc))
def compressorThread(self, compressor, emergency=False):
"""Worker thread: send a block to a foreign server and receive data."""
blocksCompressed = 0
bytesRead = 0L
bytesWritten = 0L
timeout = self.timeoutMin
online = False
self.incRunning(1)
startTime = time.time()
self.debug(3, "compressorThread: Started")
try:
compressor.open()
online = True
self.incWorking(1)
except:
self.error("%s failed to connect; retry in %d secs" % \
(compressor, timeout))
time.sleep(timeout)
while not (self.done and self.blocksCompressed == self.blocksRead):
if emergency and self.workingThreads > 1:
# This is a backup thread for when all remote servers
# are down and no regular local threads are scheduled.
self.sleep()
continue
if not online:
try:
compressor.reopen()
except:
if timeout < self.timeoutMax:
timeout *= 2
self.error("%s failed to reconnect; retry in %d secs" %\
(compressor, timeout))
time.sleep(timeout)
continue
online = True
self.error("%s reconnected" % compressor)
self.incWorking(1)
self.lock()
buffer = self.dequeueInput()
self.unlock()
if buffer:
self.debug(4, "compressorThread: compressing")
try:
if buffer.compressing:
data = compressor.compress(buffer.input)
else:
data = compressor.decompress(buffer.input)
except:
online = False
timeout = self.timeoutMin
self.error("%s dropped connection; retry in %d secs" % \
(compressor, timeout))
# Return the buffer to the queue so another thread
# can pick it up, or we'll be stuck forever.
self.lock()
self.workingThreads -= 1
self.inputQueue.append(buffer)
self.unlock()
time.sleep(timeout)
continue
self.lock()
buffer.set(data)
self.blocksCompressed += 1
blocksCompressed += 1
bytesRead += len(buffer.input)
bytesWritten += len(buffer.output)
self.debug(4, "compressorThread: compressed %d blocks" % \
self.blocksCompressed)
self.unlock()
else:
self.debug(4, "compressorThread: no input, sleeping")
self.sleep()
if online:
try:
compressor.close()
except:
self.error("Error closing %s" % compressor)
pass
delta = time.time() - startTime
megabyte = 1024.0 * 1024.0
rateIn = (float(bytesRead) / megabyte) / delta
rateOut = (float(bytesWritten) / megabyte) / delta
self.debug(1, "%s: processed %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
compressor, blocksCompressed, delta, rateIn, rateOut))
self.incRunning(-1)
class QueuedBuffer(object):
"""Placeholder for received compressed buffer items."""
def __init__(self, index, input, compressing):
"""Initialize an empty placeholder, no data yet."""
self.index = index
self.input = input
self.output = None
self.compressing = compressing
def ready(self):
return self.output is not None
def set(self, data):
"""Store data and declare that we're ready to be flushed out."""
assert self.output is None
assert data is not None
self.output = data
class LocalCompressor(object):
"""Compression tasks to run on a local thread."""
def __init__(self, blockSize100k):
self.blockSize100k = blockSize100k
def compress(self, block):
return bz2.compress(block, self.blockSize100k)
def decompress(self, block):
return bz2.decompress(block)
def open(self):
pass
def close(self):
pass
def reopen(self):
pass
def __str__(self):
return "local thread"
class RemoteCompressor(object):
def __init__(self, address, blockSize100k):
"""Address is a (host, port) tuple."""
self.address = address
self.blockSize100k = blockSize100k
def open(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self.address)
self.connection = DistBits.Connection(self.socket)
self.connection.send("COMP", struct.pack(">l", 2))
self.connection.send("ALGO", "bzip2")
self.connection.send("BLOK", str(self.blockSize100k))
def compress(self, data):
self.connection.send("HUGE", data)
(atom, retdata) = self.connection.receive()
if atom == "SMAL":
return retdata
elif atom == "EROR":
raise Exception(data)
else:
raise Exception("Unknown return atom type")
def decompress(self, data):
self.connection.send("SMAL", data)
(atom, retdata) = self.connection.receive()
if atom == "HUGE":
return retdata
elif atom == "ERROR":
raise Exception(data)
else:
raise Exception("Unknown return atom type")
def reopen(self):
try:
self.close()
except:
pass
self.open()
def close(self):
self.connection.send("CLOS")
self.connection.close()
self.socket.close()
self.connection = None
def __str__(self):
return self.address[0] + ":" + str(self.address[1])
if __name__ == "__main__":
compressor = Compressor(sys.argv[1:])
compressor.run()
Something went wrong with that request. Please try again.