Skip to content

Commit

Permalink
Merge pull request #202 from martindurant/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
martindurant committed Aug 28, 2017
2 parents 99cd107 + c3d752d commit 83e8238
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 141 deletions.
5 changes: 0 additions & 5 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
import logging
import numba
import numpy as np
import os
import pandas as pd
import struct
import sys
from decimal import Decimal
import binascii

from .thrift_structures import parquet_thrift
Expand Down
28 changes: 1 addition & 27 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,10 @@
from .converted_types import convert, typemap
from .schema import _is_list_like, _is_map_like
from .speedups import unpack_byte_array
from .thrift_structures import parquet_thrift
from .thrift_structures import parquet_thrift, read_thrift
from .util import val_to_num, byte_buffer, ex_from_sep


def read_thrift(file_obj, ttype):
"""Read a thrift structure from the given fo."""
from thrift.transport.TTransport import TFileObjectTransport, TBufferedTransport
starting_pos = file_obj.tell()

# set up the protocol chain
ft = TFileObjectTransport(file_obj)
bufsize = 2 ** 16
# for accelerated reading ensure that we wrap this so that the CReadable transport can be used.
bt = TBufferedTransport(ft, bufsize)
pin = TCompactProtocol(bt)

# read out type
obj = ttype()
obj.read(pin)

# The read will actually overshoot due to the buffering that thrift does. Seek backwards to the correct spot,.
buffer_pos = bt.cstringio_buf.tell()
ending_pos = file_obj.tell()
blocks = ((ending_pos - starting_pos) // bufsize) - 1
if blocks < 0:
blocks = 0
file_obj.seek(starting_pos + blocks * bufsize + buffer_pos)
return obj


def _read_page(file_obj, page_header, column_metadata):
"""Read the data page from the given file-object and convert it to raw,
uncompressed bytes (if necessary)."""
Expand Down
5 changes: 0 additions & 5 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
from __future__ import print_function

import array
import io
import math
import numba
import numpy as np
import os
import struct
import sys

from .speedups import unpack_byte_array
from .thrift_structures import parquet_thrift
Expand Down
1 change: 0 additions & 1 deletion fastparquet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from __future__ import unicode_literals

from collections import OrderedDict
import os
from six import text_type

from .thrift_structures import parquet_thrift
Expand Down
7 changes: 1 addition & 6 deletions fastparquet/test/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@
from __future__ import print_function
from __future__ import unicode_literals

import io
from itertools import product
import json
import numpy as np
import os
import pandas as pd
import shutil
import sys
import tempfile
import unittest

import pandas as pd
import pytest

import fastparquet
from fastparquet import writer, core

from fastparquet.test.util import sql, s3, TEST_DATA
from fastparquet.test.util import TEST_DATA, s3


@pytest.yield_fixture()
Expand Down
17 changes: 17 additions & 0 deletions fastparquet/test/test_thrift_structures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
import pickle
from fastparquet import ParquetFile, parquet_thrift
from fastparquet.test.util import TEST_DATA
from fastparquet.schema import schema_tree


def test_serialize():
fn = os.path.join(TEST_DATA, "nation.impala.parquet")
pf = ParquetFile(fn)
fmd2 = pickle.loads(pickle.dumps(pf.fmd))
schema_tree(fmd2.schema) # because we added fake fields when loading pf
assert fmd2 == pf.fmd

rg = pf.row_groups[0]
rg2 = pickle.loads(pickle.dumps(rg))
assert rg == rg2
132 changes: 126 additions & 6 deletions fastparquet/thrift_structures.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,130 @@
import os
import io
from thrift.protocol.TCompactProtocol import TCompactProtocolAccelerated as TCompactProtocol
from thrift.protocol.TProtocol import TProtocolException

# import thriftpy
from .parquet_thrift.parquet import ttypes as parquet_thrift


# THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
# parquet_thrift = thriftpy.load(THRIFT_FILE, module_name="parquet_thrift") # pylint: disable=invalid-name
def read_thrift(file_obj, ttype):
"""Read a thrift structure from the given fo."""
from thrift.transport.TTransport import TFileObjectTransport, TBufferedTransport
starting_pos = file_obj.tell()

import fastparquet.parquet_thrift.parquet.ttypes as parquet_thrift

# set up the protocol chain
ft = TFileObjectTransport(file_obj)
bufsize = 2 ** 16
# for accelerated reading ensure that we wrap this so that the CReadable transport can be used.
bt = TBufferedTransport(ft, bufsize)
pin = TCompactProtocol(bt)

# read out type
obj = ttype()
obj.read(pin)

# The read will actually overshoot due to the buffering that thrift does. Seek backwards to the correct spot,.
buffer_pos = bt.cstringio_buf.tell()
ending_pos = file_obj.tell()
blocks = ((ending_pos - starting_pos) // bufsize) - 1
if blocks < 0:
blocks = 0
file_obj.seek(starting_pos + blocks * bufsize + buffer_pos)
return obj


def write_thrift(fobj, thrift):
"""Write binary compact representation of thiftpy structured object
Parameters
----------
fobj: open file-like object (binary mode)
thrift: thriftpy object to write
Returns
-------
Number of bytes written
"""
t0 = fobj.tell()
pout = TCompactProtocol(fobj)
try:
thrift.write(pout)
fail = False
except TProtocolException as e:
typ, val, tb = sys.exc_info()
frames = []
while tb is not None:
frames.append(tb)
tb = tb.tb_next
frame = [tb for tb in frames if 'write_struct' in str(tb.tb_frame.f_code)]
variables = frame[0].tb_frame.f_locals
obj = variables['obj']
name = variables['fname']
fail = True
if fail:
raise ParquetException('Thrift parameter validation failure %s'
' when writing: %s-> Field: %s' % (
val.args[0], obj, name
))
return fobj.tell() - t0


def is_thrift_item(item):
return hasattr(item, 'thrift_spec') and hasattr(item, 'read')


def thrift_copy(structure):
"""
Recursively copy a thriftpy structure
"""
base = structure.__class__()
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec', 'validate']:
continue
val = getattr(structure, key)
if isinstance(val, list):
setattr(base, key, [thrift_copy(item)
if is_thrift_item(item)
else item for item in val])
elif is_thrift_item(val):
setattr(base, key, thrift_copy(val))
else:
setattr(base, key, val)
return base


def thrift_print(structure, offset=0):
"""
Handy recursive text ouput for thrift structures
"""
if not is_thrift_item(structure):
return str(structure)
s = str(structure.__class__) + '\n'
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec', 'validate']:
continue
s = s + ' ' * offset + key + ': ' + thrift_print(getattr(structure, key)
, offset+2) + '\n'
return s


for cls in dir(parquet_thrift):
if cls[0].isupper():
c = getattr(parquet_thrift, cls)
c.__str__ = thrift_print
c.__repr__ = thrift_print


def __getstate__(ob):
b = io.BytesIO()
write_thrift(b, ob)
b.seek(0)
return b.read()

for t in [parquet_thrift.FileMetaData, parquet_thrift.RowGroup]:
def __setstate__(ob, d, t=t):
b = io.BytesIO(d)
o = read_thrift(b, t)
ob.__dict__ = o.__dict__
t.__setstate__ = __setstate__
t.__getstate__ = __getstate__
52 changes: 4 additions & 48 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import ast
import os, os.path
import os
import os.path
import shutil
import pandas as pd
import pytest
import re
import tempfile
import thrift
import sys
import six
from thrift.protocol.TBase import TBase

from .thrift_structures import thrift_copy
try:
from pandas.api.types import is_categorical_dtype
except ImportError:
# Pandas <= 0.18.1
from pandas.core.common import is_categorical_dtype


PY2 = six.PY2
PY3 = six.PY3
STR_TYPE = six.string_types[0] # 'str' for Python3, 'basestring' for Python2
Expand Down Expand Up @@ -80,49 +79,6 @@ def ensure_bytes(s):
return s.encode('utf-8') if isinstance(s, str) else s


def thrift_print(structure, offset=0):
"""
Handy recursive text ouput for thrift structures
"""
if not isinstance(structure, TBase):
return str(structure)
s = str(structure.__class__) + '\n'
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec']:
continue
s = s + ' ' * offset + key + ': ' + thrift_print(getattr(structure, key)
, offset+2) + '\n'
return s
TBase.__str__ = thrift_print
TBase.__repr__ = thrift_print


def is_thrift_item(item):
return isinstance(item, TBase) or (hasattr(item, 'thrift_spec') and hasattr(item, 'read'))


def thrift_copy(structure):
"""
Recursively copy a thriftpy structure
"""
base = structure.__class__()
for key in dir(structure):
if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
'default_spec', 'validate']:
continue
val = getattr(structure, key)
if isinstance(val, list):
setattr(base, key, [thrift_copy(item)
if is_thrift_item(item)
else item for item in val])
elif is_thrift_item(val):
setattr(base, key, thrift_copy(val))
else:
setattr(base, key, val)
return base


def index_like(index):
"""
Does index look like a default range index?
Expand Down
44 changes: 1 addition & 43 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
from __future__ import print_function

import io
import json
import numpy as np
import os
import pandas as pd
import re
import shutil
import struct
import sys
import thrift
import warnings

import numba

from thrift.protocol.TCompactProtocol import TCompactProtocolAccelerated as TCompactProtocol
from thrift.protocol.TProtocol import TProtocolException
from .thrift_structures import parquet_thrift, write_thrift
try:
from pandas.api.types import is_categorical_dtype
except ImportError:
Expand Down Expand Up @@ -239,42 +233,6 @@ def time_shift(indata, outdata, factor=1000): # pragma: no cover
outdata[i] = indata[i] // factor


def write_thrift(fobj, thrift):
"""Write binary compact representation of thiftpy structured object
Parameters
----------
fobj: open file-like object (binary mode)
thrift: thriftpy object to write
Returns
-------
Number of bytes written
"""
t0 = fobj.tell()
pout = TCompactProtocol(fobj)
try:
thrift.write(pout)
fail = False
except TProtocolException as e:
typ, val, tb = sys.exc_info()
frames = []
while tb is not None:
frames.append(tb)
tb = tb.tb_next
frame = [tb for tb in frames if 'write_struct' in str(tb.tb_frame.f_code)]
variables = frame[0].tb_frame.f_locals
obj = variables['obj']
name = variables['fname']
fail = True
if fail:
raise ParquetException('Thrift parameter validation failure %s'
' when writing: %s-> Field: %s' % (
val.args[0], obj, name
))
return fobj.tell() - t0


def encode_plain(data, se):
"""PLAIN encoding; returns byte representation"""
out = convert(data, se)
Expand Down

0 comments on commit 83e8238

Please sign in to comment.