/
test_parquet.py
116 lines (83 loc) · 2.76 KB
/
test_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import sys
import pytest
from pandas.util import testing as tm
import ibis
from ibis.file.client import FileDatabase
pa = pytest.importorskip('pyarrow') # isort:skip
pq = pytest.importorskip('pyarrow.parquet') # isort:skip
from ibis.file.parquet import ParquetClient # noqa: E402, isort:skip
from ibis.file.parquet import ParquetTable # noqa: E402, isort:skip
pytestmark = pytest.mark.skipif(
sys.platform == 'win32', reason='See ibis issue #1698'
)
@pytest.fixture
def transformed(parquet):
closes = parquet.pq.close
opens = parquet.pq.open
t = opens.inner_join(closes, ['time', 'ticker'])
t = t[opens, closes.close]
t = t.mutate(avg=(t.open + t.close) / 2)
t = t[['time', 'ticker', 'avg']]
return t
def test_creation(parquet):
# we have existing files in our dir
d = parquet.client.root
assert len(list(d.iterdir())) == 1
pqd = d / 'pq'
assert len(list(pqd.iterdir())) == 2
assert len(pq.read_table(str(pqd / 'open.parquet'))) == 50
assert len(pq.read_table(str(pqd / 'close.parquet'))) == 50
def test_client(tmpdir, data):
# construct with a path to a file
d = tmpdir / 'pq'
d.mkdir()
for k, v in data.items():
f = d / "{}.parquet".format(k)
table = pa.Table.from_pandas(v)
pq.write_table(table, str(f))
c = ParquetClient(tmpdir)
assert c.list_databases() == ['pq']
assert c.database().pq.list_tables() == ['close', 'open']
def test_navigation(parquet):
# directory navigation
assert isinstance(parquet, FileDatabase)
result = dir(parquet)
assert result == ['pq']
d = parquet.pq
assert isinstance(d, FileDatabase)
result = dir(d)
assert result == ['close', 'open']
result = d.list_tables()
assert result == ['close', 'open']
opens = d.open
assert isinstance(opens.op(), ParquetTable)
closes = d.close
assert isinstance(closes.op(), ParquetTable)
def test_read(parquet, data):
closes = parquet.pq.close
assert str(closes) is not None
result = closes.execute()
expected = data['close']
tm.assert_frame_equal(result, expected)
result = closes.execute()
tm.assert_frame_equal(result, expected)
def test_write(transformed, tmpdir):
t = transformed
expected = t.execute()
tpath = tmpdir / 'new_dir'
tpath.mkdir()
path = tpath / 'foo.parquet'
assert not path.exists()
t = transformed[['time', 'ticker', 'avg']]
c = ibis.parquet.connect(tpath)
c.insert('foo.parquet', t)
t.execute()
assert path.exists()
# readback
c = ParquetClient(str(tpath)).database()
result = c.list_databases()
assert result == []
result = c.foo.execute()
tm.assert_frame_equal(result, expected)
path = tpath / 'foo.parquet'
assert path.exists()