Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

executable file 83 lines (67 sloc) 2.44 KB
#!/usr/bin/env python
import bson
import os
import simplejson as json
import struct
import couchbaseConstants
import pump
BSON_SCHEME = "bson://"
class BSONSource(pump.Source):
"""Reads bson file."""
def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(BSONSource, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
self.done = False
self.f = None
def can_handle(opts, spec):
return spec.startswith(BSON_SCHEME) and \
os.path.isfile(spec.replace(BSON_SCHEME, ""))
def check(opts, spec):
return 0, {'spec': spec,
'buckets': [{'name': os.path.basename(spec),
'nodes': [{'hostname': 'N/A'}]}]}
def provide_design(opts, source_spec, source_bucket, source_map):
return 0, None
def provide_batch(self):
if self.done:
return 0, None
if not self.f:
self.f = open(self.spec.replace(BSON_SCHEME, ""))
except IOError, e:
return "error: could not open bson: %s; exception: %s" % \
(self.spec, e), None
batch = pump.Batch(self)
batch_max_size = self.opts.extra['batch_max_size']
batch_max_bytes = self.opts.extra['batch_max_bytes']
cmd = couchbaseConstants.CMD_TAP_MUTATION
vbucket_id = 0x0000ffff
cas, exp, flg = 0, 0, 0
while (self.f and
batch.size() < batch_max_size and
batch.bytes < batch_max_bytes):
doc_size_buf =
if not doc_size_buf:
self.done = True
self.f = None
doc_size, = struct.unpack("<i", doc_size_buf)
doc_buf = - 4)
if not doc_buf:
self.done = True
self.f = None
doc = bson._elements_to_dict(doc_buf, dict, True)
key = doc['_id']
doc_json = json.dumps(doc)
msg = (cmd, vbucket_id, key, flg, exp, cas, '', doc_json, 0, 0, 0)
batch.append(msg, len(doc))
if batch.size() <= 0:
return 0, None
return 0, batch
Jump to Line
Something went wrong with that request. Please try again.