Skip to content

Commit

Permalink
Merge pull request #29 from mrocklin/read-row-group
Browse files Browse the repository at this point in the history
Read row group
  • Loading branch information
martindurant committed Nov 18, 2016
2 parents be061d1 + 2d0a9ef commit bdce556
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
47 changes: 14 additions & 33 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,22 @@ def _read_partitions(self):
cats.setdefault(key, set()).add(val)
self.cats = {key: list(v) for key, v in cats.items()}

def row_group_filename(self, rg):
return self.sep.join([os.path.dirname(self.fn),
rg.columns[0].file_path])

def read_row_group_file(self, rg, columns, categories):
""" Open file for reading, and process it as a row-group """
ofname = self.sep.join([os.path.dirname(self.fn),
rg.columns[0].file_path])
with self.open(ofname, 'rb') as f:
return self.read_row_group(rg, columns, categories, infile=f)
fn = self.row_group_filename(rg)
return core.read_row_group_file(fn, rg, columns, categories,
self.helper, self.cats, open=self.open)

def read_row_group(self, rg, columns, categories, infile=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
return core.read_row_group(infile, rg, columns, categories,
self.helper, self.cats)

def grab_cats(self, columns, row_group_index=0):
""" Read dictionaries of first row_group
Expand Down Expand Up @@ -144,35 +154,6 @@ def grab_cats(self, columns, row_group_index=0):
grab_dict=True)
return out

def read_row_group(self, rg, columns, categories, infile=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
out = {}

for column in rg.columns:
name = ".".join(column.meta_data.path_in_schema)
if name not in columns:
continue

use = name in categories if categories is not None else False
s = core.read_col(column, self.helper, infile, use_cat=use)
out[name] = s
out = pd.DataFrame(out, columns=columns)

# apply categories
for cat in self.cats:
# *Hard assumption*: all chunks in a row group have the
# same partition (correct for spark/hive)
partitions = re.findall("([a-zA-Z_]+)=([^/]+)/",
rg.columns[0].file_path)
val = [p[1] for p in partitions if p[0] == cat][0]
codes = np.empty(rg.num_rows, dtype=np.int16)
codes[:] = self.cats[cat].index(val)
out[cat] = pd.Categorical.from_codes(
codes, [val_to_num(c) for c in self.cats[cat]])
return out

def to_pandas(self, columns=None, categories=None, filters=[]):
"""
Read data from parquet into a Pandas dataframe.
Expand Down
44 changes: 41 additions & 3 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import io
import numpy as np
import os
import pandas as pd
import re
import struct

import numpy as np
import pandas as pd
from thriftpy.protocol.compact import TCompactProtocolFactory

from . import encoding
from .compression import decompress_data
from .converted_types import convert
from .thrift_filetransport import TFileTransport
from .thrift_structures import parquet_thrift
from .compression import decompress_data
from .util import val_to_num


def read_thrift(file_obj, ttype):
Expand Down Expand Up @@ -246,3 +249,38 @@ def read_col(column, schema_helper, infile, use_cat=False,
if se.converted_type is not None:
out = convert(out, se)
return out


def read_row_group_file(fn, columns, *args, open=open):
with open(fn, mode='rb') as f:
return read_row_group(f, columns, *args)


def read_row_group(file, rg, columns, categories, schema_helper, cats):
"""
Access row-group in a file and read some columns into a data-frame.
"""
out = {}

for column in rg.columns:
name = ".".join(column.meta_data.path_in_schema)
if name not in columns:
continue

use = name in categories if categories is not None else False
s = read_col(column, schema_helper, file, use_cat=use)
out[name] = s
out = pd.DataFrame(out, columns=columns)

# apply categories
for cat in cats:
# *Hard assumption*: all chunks in a row group have the
# same partition (correct for spark/hive)
partitions = re.findall("([a-zA-Z_]+)=([^/]+)/",
rg.columns[0].file_path)
val = [p[1] for p in partitions if p[0] == cat][0]
codes = np.empty(rg.num_rows, dtype=np.int16)
codes[:] = cats[cat].index(val)
out[cat] = pd.Categorical.from_codes(
codes, [val_to_num(c) for c in cats[cat]])
return out

0 comments on commit bdce556

Please sign in to comment.