Skip to content

Commit

Permalink
Fixes for alternate pythons
Browse files Browse the repository at this point in the history
  • Loading branch information
Bob Corsaro committed Feb 4, 2016
1 parent 7f2ea3d commit d63c781
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
6 changes: 3 additions & 3 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from tubing.ext import elasticsearch
from tubing import sinks, sources, pipe

EXPECTED = """{"update": {"_type": "test", "_id": "id0"}}
EXPECTED = b"""{"update": {"_type": "test", "_id": "id0"}}
{"doc": {"name": "id0"}, "doc_as_upsert": true}
{"update": {"_type": "test-child", "_id": "id1", "parent": "id0"}}
{"doc": {"name": "id1"}, "doc_as_upsert": false}
Expand Down Expand Up @@ -32,7 +32,7 @@ def make_du(i):
doc_type='test',
)

sink.write([make_du(i) for i in xrange(0,150)])
sink.write([make_du(i) for i in range(0,150)])
sink.done()

self.assertEqual(301, len(buffer0.getvalue().split("\n")))
self.assertEqual(301, len(buffer0.getvalue().split(b"\n")))
10 changes: 5 additions & 5 deletions tubing/ext/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def action(self):
)
if self.parent_esid:
action['update']['parent'] = self.parent_esid
return json.dumps(action)
return json.dumps(action).encode('utf-8')

def update(self):
return json.dumps(dict(
doc_as_upsert=self.doc_as_upsert,
doc=self.doc,
))
)).encode('utf-8')



Expand All @@ -59,14 +59,14 @@ def __init__(self, sink, batch_size=7500):

def write(self, docs):
for doc in docs:
self.batch.append("{}\n{}\n".format(doc.action(), doc.update()))
self.batch.append(doc.action() + b"\n" + doc.update() + b"\n")
if len(self.batch) > self.batch_size:
batch = ''.join(self.batch[:self.batch_size])
batch = b''.join(self.batch[:self.batch_size])
self.batch = self.batch[self.batch_size:]
self.sink.write(batch)

def done(self):
self.sink.write(''.join(self.batch))
self.sink.write(b''.join(self.batch))
return super(BulkBatcherSink, self).done()


Expand Down

0 comments on commit d63c781

Please sign in to comment.