Skip to content

Commit

Permalink
Merge pull request #39 from martindurant/optimizations
Browse files Browse the repository at this point in the history
Optimizations: column encodings
  • Loading branch information
martindurant committed Dec 8, 2016
2 parents b4a7380 + b5eee6b commit 03f723d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 39 deletions.
8 changes: 6 additions & 2 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,17 @@ def to_pandas(self, columns=None, categories=None, filters=[],
Pandas data-frame
"""
tot = self.iter_row_groups(columns, categories, filters, index)
num_row_groups = len(self.filter_row_groups(filters))
columns = columns or self.columns

# TODO: if categories vary from one rg to next, need
# pandas.types.concat.union_categoricals
try:
return pd.concat(tot, ignore_index=index is None)
except ValueError:
if num_row_groups > 1:
return pd.concat(tot, ignore_index=index is None, copy=False)
else:
return next(iter(tot))
except (ValueError, StopIteration):
return pd.DataFrame(columns=columns + list(self.cats))

@property
Expand Down
2 changes: 1 addition & 1 deletion fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def time_column():
d = pd.DataFrame({'w': pd.Categorical(np.random.choice(
['hi', 'you', 'people'], size=n)),
'x': r.view('timedelta64[ns]'),
'y': r.view('float64'),
'y': r / np.random.randint(1, 1000, size=n),
'z': np.random.randint(0, 127, size=n,
dtype=np.uint8)})

Expand Down
38 changes: 27 additions & 11 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def read_rep(io_obj, daph, helper, metadata):
return repetition_levels


def read_data_page(f, helper, header, metadata, skip_nulls=False):
def read_data_page(f, helper, header, metadata, skip_nulls=False,
selfmade=False):
"""Read a data page: definitions, repetitions, values (in order)
Only values are guaranteed to exist, e.g., for a top-level, required
Expand All @@ -112,18 +113,26 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False):
metadata.type,
int(daph.num_values - num_nulls),
width=width)
elif daph.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY:
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
bit_width = io_obj.read_byte()
if bit_width:
if daph.encoding == parquet_thrift.Encoding.RLE:
bit_width = helper.schema_element(
metadata.path_in_schema[-1]).type_length
else:
bit_width = io_obj.read_byte()
if bit_width in [8, 16, 32] and selfmade:
num = (encoding.read_unsigned_var_int(io_obj) >> 1) * 8
values = io_obj.read(num * bit_width // 8).view('int%i' % bit_width)
elif bit_width:
values = encoding.Numpy32(np.zeros(daph.num_values,
dtype=np.int32))
# length is simply "all data left in this page"
encoding.read_rle_bit_packed_hybrid(
io_obj, bit_width, io_obj.len-io_obj.loc, o=values)
values = values.data[:daph.num_values-num_nulls]
else:
values = np.zeros(daph.num_values-num_nulls, dtype=np.int64)
values = np.zeros(daph.num_values-num_nulls, dtype=np.int8)
else:
raise NotImplementedError('Encoding %s' % daph.encoding)
return definition_levels, repetition_levels, values
Expand Down Expand Up @@ -205,7 +214,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
else:
skip_nulls = False
defi, rep, val = read_data_page(infile, schema_helper, ph, cmd,
skip_nulls)
skip_nulls, selfmade=selfmade)
d = ph.data_page_header.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY
out.append((defi, rep, val, d))
num += len(defi) if defi is not None else len(val)
Expand Down Expand Up @@ -259,7 +268,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
final[start:start+len(val)] = cval
start += len(val)
if all_dict:
final = pd.Categorical.from_codes(final, categories=dic)
final = pd.Categorical(final, categories=dic, fastpath=True)
return final


Expand Down Expand Up @@ -301,12 +310,19 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats,
out = read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade)

if index is not None and index in columns:
i = out.pop(index)
i = out.pop(index, None)
dtypes = {str(d.dtype) for d in out.values()}
if len(out) == 1 and dtypes != {'category'}:
c = list(out)[0]
out = pd.DataFrame(out[c].reshape(-1, 1), columns=[c], index=i)
elif len(dtypes) == 1 and dtypes != {'category'}:
columns = list(sorted(out))
out = pd.DataFrame(np.hstack([out[k].reshape(-1, 1) for k in columns]),
columns=columns)
else:
out = pd.DataFrame(out, index=i)
if i is not None:
out.index.name = index
else:
out = pd.DataFrame(out, columns=columns)

# apply categories
for cat in cats:
Expand Down
21 changes: 18 additions & 3 deletions fastparquet/test/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def test_roundtrip_complex(tempdir, scheme,):
import datetime
data = pd.DataFrame({'ui32': np.arange(1000, dtype=np.uint32),
'i16': np.arange(1000, dtype=np.int16),
'ui8': np.array([1, 2, 3, 4]*250, dtype=np.uint8),
'f16': np.arange(1000, dtype=np.float16),
'dicts': [{'oi': 'you'}] * 1000,
't': [datetime.datetime.now()] * 1000,
Expand Down Expand Up @@ -350,7 +351,6 @@ def test_empty_groupby(tempdir):
assert row.b in list(df[(df.a==row.a)&(df.c==row.c)].b)



@pytest.mark.parametrize('compression', ['GZIP',
'gzip',
None,
Expand Down Expand Up @@ -469,6 +469,7 @@ def test_auto_null(tempdir):
'b': [1., 2., 3., np.nan],
'c': pd.to_timedelta([1, 2, 3, np.nan], unit='ms'),
'd': ['a', 'b', 'c', None]})
df['e'] = df['d'].astype('category')
fn = os.path.join(tmp, "test.parq")

with pytest.raises(TypeError):
Expand All @@ -480,14 +481,28 @@ def test_auto_null(tempdir):
for col in pf.schema[2:]:
assert col.repetition_type == parquet_thrift.FieldRepetitionType.OPTIONAL
assert pf.schema[1].repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED
df2 = pf.to_pandas()
df2 = pf.to_pandas(categories=['e'])
tm.assert_frame_equal(df, df2)

write(fn, df, has_nulls=None)
pf = ParquetFile(fn)
for col in pf.schema[1:3]:
assert col.repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED
assert pf.schema[4].repetition_type == parquet_thrift.FieldRepetitionType.OPTIONAL
df2= pf.to_pandas()
df2= pf.to_pandas(categories=['e'])
tm.assert_frame_equal(df, df2)


@pytest.mark.parametrize('n', (10, 127, 2**8 + 1, 2**16 + 1))
def test_many_categories(tempdir, n):
tmp = str(tempdir)
cats = np.arange(n)
codes = np.random.randint(0, n, size=1000000)
df = pd.DataFrame({'x': pd.Categorical.from_codes(codes, cats)})
fn = os.path.join(tmp, "test.parq")

write(fn, df, has_nulls=False)
pf = ParquetFile(fn)
out = pf.to_pandas(categories=['x'])

tm.assert_frame_equal(df, out)
69 changes: 47 additions & 22 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def find_type(data, convert=False, fixed_text=None):
return se, type, out


@numba.njit()
@numba.njit(nogil=True)
def time_shift(indata, outdata, factor=1000):
for i in range(len(indata)):
if indata[i] == nat:
Expand Down Expand Up @@ -317,21 +317,39 @@ def encode_rle_bp(data, width, o, withlength=False):
o.loc = end


def encode_rle(data, se, fixed_text=None):
if data.dtype.kind not in ['i', 'u']:
raise ValueError('RLE/bitpack encoding only works for integers')
if se.type_length in [8, 16]:
o = encoding.Numpy8(np.empty(10, dtype=np.uint8))
bit_packed_count = (len(data) + 7) // 8
encode_unsigned_varint(bit_packed_count << 1 | 1, o) # write run header
return o.so_far().tostring() + data.values.tostring()
else:
m = data.max()
width = 0
while m:
m >>= 1
width += 1
l = (len(data) * width + 7) // 8 + 10
o = encoding.Numpy8(np.empty(l, dtype='uint8'))
encode_rle_bp(data, width, o)
return o.so_far().tostring()


def encode_dict(data, se, _):
""" The data part of dictionary encoding is always int32s, with RLE/bitpack
""" The data part of dictionary encoding is always int8, with RLE/bitpack
"""
# TODO: should width be a parameter equal to len(cats) ?
width = encoding.width_from_max_int(data.max())
ldata = ((len(data) + 7) // 8) * width + 11
i = data.values.astype(np.int32, copy=False)
out = encoding.Numpy8(np.empty(ldata, dtype=np.uint8))
out.write_byte(width)
encode_rle_bp(i, width, out)
return out.so_far().tostring()
width = data.values.dtype.itemsize * 8
o = encoding.Numpy8(np.empty(10, dtype=np.uint8))
o.write_byte(width)
bit_packed_count = (len(data) + 7) // 8
encode_unsigned_varint(bit_packed_count << 1 | 1, o) # write run header
return o.so_far().tostring() + data.values.tostring()

encode = {
'PLAIN': encode_plain,
'RLE': encode_rle_bp,
'RLE': encode_rle,
'PLAIN_DICTIONARY': encode_dict,
# 'DELTA_BINARY_PACKED': encode_delta
}
Expand Down Expand Up @@ -388,7 +406,10 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
tot_rows = len(data)

if has_nulls:
num_nulls = data.count() - len(data)
if str(data.dtype) == 'category':
num_nulls = (data.cat.codes == -1).sum()
else:
num_nulls = len(data) - data.count()
definition_data, data = make_definitions(data, num_nulls == 0)
else:
definition_data = b""
Expand All @@ -400,6 +421,7 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
cats = False
name = data.name
diff = 0
max, min = None, None

if str(data.dtype) == 'category':
dph = parquet_thrift.DictionaryPageHeader(
Expand All @@ -423,29 +445,32 @@ def write_column(f, data, selement, encoding='PLAIN', compression=None):
write_thrift(f, ph)
f.write(bdata)
try:
max, min = data.max(), data.min()
max = encode['PLAIN'](pd.Series([max]), selement,
fixed_text=fixed_text)
min = encode['PLAIN'](pd.Series([min]), selement,
fixed_text=fixed_text)
if num_nulls == 0:
max, min = data.values.max(), data.values.min()
max = encode['PLAIN'](pd.Series([max]), selement,
fixed_text=fixed_text)
min = encode['PLAIN'](pd.Series([min]), selement,
fixed_text=fixed_text)
except TypeError:
max, min = None, None
data = data.cat.codes.astype(np.int32)
pass
data = data.cat.codes
cats = True
encoding = "PLAIN_DICTIONARY"
elif str(data.dtype) in ['int8', 'int16', 'uint8', 'uint16']:
encoding = "RLE"

start = f.tell()
bdata = definition_data + repetition_data + encode[encoding](data, selement,
fixed_text)
try:
if encoding != 'PLAIN_DICTIONARY':
max, min = data.max(), data.min()
if encoding != 'PLAIN_DICTIONARY' and num_nulls == 0:
max, min = data.values.max(), data.values.min()
max = encode['PLAIN'](pd.Series([max], dtype=data.dtype), selement,
fixed_text=fixed_text)
min = encode['PLAIN'](pd.Series([min], dtype=data.dtype), selement,
fixed_text=fixed_text)
except TypeError:
max, min = None, None
pass

dph = parquet_thrift.DataPageHeader(
num_values=tot_rows,
Expand Down

0 comments on commit 03f723d

Please sign in to comment.