Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Doug Woos committed Apr 23, 2012
0 parents commit 5c40fe0
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 0 deletions.
1 change: 1 addition & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Gamechanger presplitting code
6 changes: 6 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from balancer import Balancer
import util
from objectid import ObjectId
from fake_balancer import FakeBalancer
from mongo_logger import MongoLogger
from presplitter import PreSplitter
111 changes: 111 additions & 0 deletions balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import pymongo
import logging
from objectid import ObjectId

class Balancer(object):
"""
Provides various load-balancing services for a sharded collection.
"""

def __init__(self, conn, ns, key):
self._config = conn.config
self._admin = conn.admin
self._ns = ns
self._key = key

def chunk_for_key(self, key):
"""
Returns the chunk that contains the key <key>.
"""
return self._config.chunks.find_one({'ns': self._ns,
'min.%s' % self._key: {'$lte': key},
'max.%s' % self._key: {'$gt': key}})

def split_chunk(self, key):
"""
Wrapper on top of the mongo split command. Finds and returns
the two resulting chunks.
"""
self._admin.command('split', self._ns, middle={self._key: key})
return (self._config.chunks.find_one({'max.%s' % self._key: key}),
self._config.chunks.find_one({'min.%s' % self._key: key}))

def move_chunk(self, key, dest):
"""
Wrapper on top of the mongo moveChunk command.
"""
try:
self._admin.command('moveChunk', self._ns,
find={self._key: key},
to=dest)
except pymongo.errors.OperationFailure as e:
logging.warning("Got an OperationFailure.")
logging.warning("This is likely just Mongo complaining because this shard is already at its destination.")
logging.warning(e.args)

def divide_chunk(self, key, divisions):
"""
Divide the chunk specified by <key> into <divisions> separate
chunks, each containing an equal key range.
"""
chunk = self.chunk_for_key(key)
new_chunks = []
def divide_chunk_helper(chunk, divisions):
if divisions < 2:
new_chunks.append(chunk)
return
min = ObjectId(chunk['min'][self._key])
max = ObjectId(chunk['max'][self._key])
split = min + (max-min)/divisions
logging.info('splitting at %s' % split)
new, remaining = self.split_chunk(split)
new_chunks.append(new)
divide_chunk_helper(remaining, divisions-1)
divide_chunk_helper(chunk, divisions)
return new_chunks

def chunks_for_range(self, start, end):
"""
All of the chunks between the <start> and <end> keys,
inclusive. Returns a generator.
"""

min_chunk = self.chunk_for_key(start)
max_chunk = self.chunk_for_key(end)
if not min_chunk or not max_chunk:
return
if min_chunk['_id'] == max_chunk['_id']:
yield min_chunk
return

other_chunks = self._config.chunks.find({'min.%s' % self._key:
{'$gt': min_chunk['min'][self._key],
'$lt': max_chunk['min'][self._key]}})
other_chunks = other_chunks.sort('min.%s' % self._key)

yield min_chunk
for chunk in other_chunks:
yield chunk
yield max_chunk

def balance_range(self, start, end, shards):
"""
Divide a range of chunks between any number of shards.
"""
chunks = list(self.chunks_for_range(start, end))
if not chunks:
logging.info('no chunks found')
return

if chunks[0]['min'][self._key] < start and chunks[0]['max'][self._key] != start:
chunks[0] = self.split_chunk(start)[1]
if chunks[-1]['max'][self._key] > end and chunks[-1]['min'][self._key] != end:
chunks[-1] = self.split_chunk(end)[0]

total = 0
for chunk in chunks:
logging.info('dividing chunk %s' % chunk)
new_chunks = self.divide_chunk(chunk['min'][self._key], len(shards))
for i, shard in enumerate(shards):
total += self.move_chunk(new_chunks[i]['min'][self._key], shard) or 0
return total
49 changes: 49 additions & 0 deletions fake_balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pymongo
from balancer import Balancer

class FakeBalancer(Balancer):
"""
Act like a balancer, but make changes to a fake config collection
instead of the actual sharding config.
"""

def __init__(self, conn, ns, key, coll_name='fake_config'):
super(FakeBalancer, self).__init__(conn, ns, key)
# fake balancer uses fake config
self._config = conn[coll_name]
self._connections = {}

def _collection_for_chunk(self, chunk):
if chunk['shard'] not in self._connections:
shard = self._config.shards.find_one({'_id': chunk['shard']})
self._connections[chunk['shard']] = pymongo.Connection(shard['host'],
slave_okay=True)
return self._connections[chunk['shard']]

def _count_for_chunk(self, chunk):
spec = chunk['ns'].split('.')
db, coll = spec[0], ''.join(spec[1:])
return self._collection_for_chunk(chunk)[db][coll].find(
{self._key: {'$gte': chunk['min'][self._key],
'$lte': chunk['max'][self._key]}}).count()

def split_chunk(self, key):
"""
Simulate the behavior of mongo's split command
"""
chunk = self.chunk_for_key(key)
new_chunk = chunk.copy()
chunk['max'] = new_chunk['min'] = {self._key: key}
new_chunk['_id'] = '%s-%s_%s' % (self._ns, self._key, repr(key))
self._config.chunks.save(chunk)
self._config.chunks.save(new_chunk)
return chunk, new_chunk

def move_chunk(self, key, dest):
"""
Simulate the behavior of mongo's moveChunk command
"""
chunk = self.chunk_for_key(key)
chunk['shard'] = dest
self._config.chunks.save(chunk)
return self._count_for_chunk(chunk)
13 changes: 13 additions & 0 deletions mongo_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging
import time

class MongoLogger(object):

def __init__(self, coll):
self._coll = coll

def log_id(self, id, count=0):
self._coll.save({'_id': id, 'p': True, 'd': time.time(), 'n': count})

def check_id(self, id):
return self._coll.find_one({'_id': id, 'p': True})
65 changes: 65 additions & 0 deletions objectid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from bson.objectid import ObjectId as BaseObjectId

class ObjectId(BaseObjectId):
"""
Adds some methods which are useful for dealing with client ids and
for converting to/from dates and ints
"""

def __init__(self, oid, client=False):
self.client = getattr(oid, 'client', None) or client
super(ObjectId, self).__init__(oid)

def to_client_id(self):
remap = [0, 1, 4, 5, 6, 7, 8, 2, 3, 9, 10, 11]
# do the inverse gc device shuffle, boop boop de doop
bytes = list(self.__id)
remapped_bytes = [bytes[remap[i]] for i in range(12)]
return ObjectId("".join(remapped_bytes), client=True)


def to_server_id(self):
remap = [0, 1, 7, 8, 2, 3, 4, 5, 6, 9, 10, 11]
# do the gc device shuffle, boop boop de doop
bytes = list(self.__id)
remapped_bytes = [bytes[remap[i]] for i in range(12)]
return ObjectId("".join(remapped_bytes), client=False)

@property
def generation_time(self):
if self.client:
return self.to_server_id().generation_time
return super(ObjectId, self).generation_time

def __int__(self):
return int(str(self), 16)

@classmethod
def from_int(cls, x):
return ObjectId(hex(x)[2:].rstrip('L'))

def __add__(self, other):
return self.from_int(int(self) + int(other))

def __sub__(self, other):
return int(self) - int(other)

@classmethod
def _pop_from_dict(self, d, k):
v = None
if k in d:
v = d[k]
del d[k]
return v

@classmethod
def from_datetime(cls, *args, **kwargs):
min = cls._pop_from_dict(kwargs, 'min')
max = cls._pop_from_dict(kwargs, 'max')

assert not (min and max), "Both min and max specified"

oid = ObjectId(BaseObjectId.from_datetime(*args, **kwargs))
if min or max:
oid = ObjectId(oid.__id[:2] + ('\x00' if min else '\xff') * 10)
return oid
44 changes: 44 additions & 0 deletions presplitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

from datetime import datetime, time, timedelta
from objectid import ObjectId

_midnight = time()
_noon = time(hour=12)
_day = timedelta(days=1)

class PreSplitter(object):

def __init__(self, balancer, shards, logger=None):
self._balancer = balancer
self._shards = shards
self._logger = logger

def _check_id(self, id):
return self._logger.check_id(id) if self._logger else True

def _log_id(self, id, count=0):
return self._logger.log_id(id, count) if self._logger else None

def presplit(self, date):
start_of_day = datetime.combine(date, _midnight)
middle_of_day = datetime.combine(date, _noon)
end_of_day = start_of_day + _day

mins = [ObjectId.from_datetime(d, min=True) for d in
(start_of_day, middle_of_day, end_of_day)]
mins = [id for id in mins if not self._check_id(id)]

if not mins:
logging.info('have already pre-split all chunks for %s' % date)
return

start = min(mins)
end = ObjectId.from_datetime(end_of_day, max=True)

count = self._balancer.balance_range(start, end, self._shards)
for id in mins:
if id == start:
self._log_id(id, count)
else:
self._log_id(id)

0 comments on commit 5c40fe0

Please sign in to comment.