Skip to content

Commit

Permalink
Handle multi-page maps (#368)
Browse files Browse the repository at this point in the history
* Ensure in case of maps the 2nd dictionary gets processed in the value column

* Test data for map with multiple pages per column

* Test case for maps with multiple pages per column

* Fix indices

* cleanup

* fix test

* fix test
  • Loading branch information
cmenguy authored and martindurant committed Aug 31, 2018
1 parent e5ced3c commit 845494f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
5 changes: 3 additions & 2 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
my_nan = None

num = 0
row_idx = 0
while True:
if (selfmade and hasattr(cmd, 'statistics') and
getattr(cmd.statistics, 'null_count', 1) == 0):
Expand All @@ -249,8 +250,8 @@ def read_col(column, schema_helper, infile, use_cat=False,
null = not schema_helper.is_required(cmd.path_in_schema[0])
null_val = (se.repetition_type !=
parquet_thrift.FieldRepetitionType.REQUIRED)
num = encoding._assemble_objects(assign, defi, rep, val, dic, d,
null, null_val, max_defi)
row_idx = 1 + encoding._assemble_objects(assign, defi, rep, val, dic, d,
null, null_val, max_defi, row_idx)
elif defi is not None:
max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
part = assign[num:num+len(defi)]
Expand Down
4 changes: 2 additions & 2 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def so_far(self):
Numpy32 = numba.jitclass(spec32)(NumpyIO)


def _assemble_objects(assign, defi, rep, val, dic, d, null, null_val, max_defi):
def _assemble_objects(assign, defi, rep, val, dic, d, null, null_val, max_defi, prev_i):
"""Dremel-assembly of arrays of values into lists
Parameters
Expand All @@ -250,7 +250,7 @@ def _assemble_objects(assign, defi, rep, val, dic, d, null, null_val, max_defi):
if d:
# dereference dict values
val = dic[val]
i = 0
i = prev_i
vali = 0
part = []
started = False
Expand Down
16 changes: 16 additions & 0 deletions fastparquet/test/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,19 @@ def test_no_columns(tempdir):
expected = pd.DataFrame({"A": [1, 2]})[[]]
assert len(result) == 2
pd.testing.assert_frame_equal(result, expected)

def test_map_multipage(tempdir):
pf = fastparquet.ParquetFile(os.path.join(TEST_DATA, "map-test.snappy.parquet"))
assert pf.count == 3551
df = pf.to_pandas()
first_row_keys = [u'FoxNews.com', u'News Network', u'mobile technology', u'broadcast', u'sustainability',
u'collective intelligence', u'radio', u'business law', u'LLC', u'telecommunications',
u'FOX News Network']
last_row_keys = [u'protests', u'gas mask', u'Pot & Painting Party', u'Denver', u'New Year', u'Anderson Cooper',
u'gas mask bonk', u'digital media', u'marijuana leaf earrings', u'Screengrab', u'gas mask bongs',
u'Randi Kaye', u'Lee Rogers', u'Andy Cohen', u'CNN', u'Times Square', u'Colorado', u'opera',
u'slavery', u'Kathy Griffin', u'marijuana cigarette', u'executive producer']
assert len(df) == 3551
assert sorted(df["topics"].iloc[0].keys()) == sorted(first_row_keys)
assert sorted(df["topics"].iloc[-1].keys()) == sorted(last_row_keys)
assert df.isnull().sum().sum() == 0 # ensure every row got converted
Binary file added test-data/map-test.snappy.parquet
Binary file not shown.

0 comments on commit 845494f

Please sign in to comment.