Skip to content

Commit

Permalink
Merge bec525f into 44d8245
Browse files Browse the repository at this point in the history
  • Loading branch information
roll committed Oct 4, 2019
2 parents 44d8245 + bec525f commit 97be6cf
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
29 changes: 25 additions & 4 deletions dataflows/processors/sort_rows.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
import re
import decimal
from kvfile import KVFile

from bitstring import BitArray
from ..helpers.resource_matcher import ResourceMatcher


class KeyCalc(object):
def __init__(self, key_spec):
self.key_spec = key_spec
self.key_list = re.findall(r'\{(.*?)\}', key_spec)

def __call__(self, row):
return self.key_spec.format(**row)
context = row.copy()
for key, value in row.items():
# We need to stringify some types to make them properly comparable
if key in self.key_list:
# numbers
# https://www.h-schmidt.net/FloatConverter/IEEE754.html
if isinstance(value, (int, float, decimal.Decimal)):
bits = BitArray(float=value, length=64)
# invert the sign bit
bits.invert(0)
# invert negative numbers
if value < 0:
bits.invert(range(1, 64))
context[key] = bits.hex
return self.key_spec.format(**context)


def _sorter(rows, key_calc, reverse, batch_size):
db = KVFile()
db.insert(((key_calc(row) + "{:08x}".format(row_num), row) for row_num, row in enumerate(rows)),
batch_size=batch_size)

def process(rows):
for row_num, row in enumerate(rows):
key = key_calc(row) + "{:08x}".format(row_num)
yield (key, row)

db.insert(process(rows), batch_size=batch_size)
for _, value in db.items(reverse=reverse):
yield value

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def read(*paths):
'tabulate',
'tableschema-sql',
'xmljson',
'bitstring>=3',
]
SPEEDUP_REQUIRES = [
'plyvel',
Expand Down
73 changes: 72 additions & 1 deletion tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,78 @@ def test_sort_reverse_many_rows():
results, _, _ = f.results()
results = results[0]
assert results[0:2] == [{'a': 999, 'b': 4}, {'a': 994, 'b': 4}]
assert results[998:1000] == [{'a': 100, 'b': 0}, {'a': 0, 'b': 0}]
assert results[998:1000] == [{'a': 5, 'b': 0}, {'a': 0, 'b': 0}]


def test_sort_rows_number():
from dataflows import sort_rows

f = Flow(
[
{'a': 0.1},
{'a': -3},
{'a': -4},
{'a': 10},
{'a': 8},
{'a': 0},
{'a': -1000000},
{'a': 1000000},
{'a': -0.1},
{'a': -0.2},
{'a': 0.2},
{'a': -1000001},
{'a': 1000001},
{'a': 6},
{'a': -10},
{'a': -0.001},
{'a': 0.001},
{'a': 1},
{'a': -1},
],
sort_rows(key='{a}'),
)
results, _, _ = f.results()
assert list(results[0]) == [
{'a': -1000001},
{'a': -1000000},
{'a': -10},
{'a': -4},
{'a': -3},
{'a': -1},
{'a': -0.2},
{'a': -0.1},
{'a': -0.001},
{'a': 0},
{'a': 0.001},
{'a': 0.1},
{'a': 0.2},
{'a': 1},
{'a': 6},
{'a': 8},
{'a': 10},
{'a': 1000000},
{'a': 1000001},
]


def test_sort_rows_datetime():
import datetime
from dataflows import sort_rows

f = Flow(
[
{'a': datetime.date(2000, 1, 3)},
{'a': datetime.date(2010, 1, 2)},
{'a': datetime.date(2020, 1, 1)},
],
sort_rows(key='{a}'),
)
results, _, _ = f.results()
assert list(results[0]) == [
{'a': datetime.date(2000, 1, 3)},
{'a': datetime.date(2010, 1, 2)},
{'a': datetime.date(2020, 1, 1)},
]


def test_duplicate():
Expand Down

0 comments on commit 97be6cf

Please sign in to comment.