/
test_data_frame.py
242 lines (194 loc) · 7.01 KB
/
test_data_frame.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# coding=utf-8
#
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis-python
#
# Most of this work is copyright (C) 2013-2018 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
#
# END HEADER
from __future__ import division, print_function, absolute_import
import numpy as np
import pytest
import hypothesis.strategies as st
import hypothesis.extra.numpy as npst
import hypothesis.extra.pandas as pdst
from hypothesis import given, reject
from hypothesis.types import RandomWithSeed as Random
from tests.common.debug import minimal, find_any
from tests.pandas.helpers import supported_by_pandas
@given(pdst.data_frames([
pdst.column('a', dtype=int),
pdst.column('b', dtype=float),
]))
def test_can_have_columns_of_distinct_types(df):
assert df['a'].dtype == np.dtype(int)
assert df['b'].dtype == np.dtype(float)
@given(pdst.data_frames(
[pdst.column(dtype=int)],
index=pdst.range_indexes(min_size=1, max_size=5)))
def test_respects_size_bounds(df):
assert 1 <= len(df) <= 5
@given(pdst.data_frames(pdst.columns(['A', 'B'], dtype=float)))
def test_can_specify_just_column_names(df):
df['A']
df['B']
@given(pdst.data_frames(pdst.columns(2, dtype=float)))
def test_can_specify_just_column_count(df):
df[0]
df[1]
@given(pdst.data_frames(
rows=st.fixed_dictionaries({'A': st.integers(1, 10), 'B': st.floats()}))
)
def test_gets_the_correct_data_shape_for_just_rows(table):
assert table['A'].dtype == np.dtype('int64')
assert table['B'].dtype == np.dtype(float)
@given(pdst.data_frames(
columns=pdst.columns(['A', 'B'], dtype=int),
rows=st.lists(st.integers(0, 1000), min_size=2, max_size=2).map(sorted),
))
def test_can_specify_both_rows_and_columns_list(d):
assert d['A'].dtype == np.dtype(int)
assert d['B'].dtype == np.dtype(int)
for _, r in d.iterrows():
assert r['A'] <= r['B']
@given(pdst.data_frames(
columns=pdst.columns(['A', 'B'], dtype=int),
rows=st.lists(
st.integers(0, 1000), min_size=2, max_size=2).map(sorted).map(tuple),
))
def test_can_specify_both_rows_and_columns_tuple(d):
assert d['A'].dtype == np.dtype(int)
assert d['B'].dtype == np.dtype(int)
for _, r in d.iterrows():
assert r['A'] <= r['B']
@given(pdst.data_frames(
columns=pdst.columns(['A', 'B'], dtype=int),
rows=st.lists(st.integers(0, 1000), min_size=2, max_size=2).map(
lambda x: {'A': min(x), 'B': max(x)}),
))
def test_can_specify_both_rows_and_columns_dict(d):
assert d['A'].dtype == np.dtype(int)
assert d['B'].dtype == np.dtype(int)
for _, r in d.iterrows():
assert r['A'] <= r['B']
@given(
pdst.data_frames([pdst.column('A', fill=st.just(float('nan')),
dtype=float,
elements=st.floats(allow_nan=False))],
rows=st.builds(dict)))
def test_can_fill_in_missing_elements_from_dict(df):
assert np.isnan(df['A']).all()
subsets = ['', 'A', 'B', 'C', 'AB', 'AC', 'BC', 'ABC']
@pytest.mark.parametrize('disable_fill', subsets)
@pytest.mark.parametrize('non_standard_index', [True, False])
def test_can_minimize_based_on_two_columns_independently(
disable_fill, non_standard_index
):
columns = [
pdst.column(
name, dtype=bool,
fill=st.nothing() if name in disable_fill else None,
)
for name in ['A', 'B', 'C']
]
x = minimal(
pdst.data_frames(
columns,
index=pdst.indexes(dtype=int) if non_standard_index else None,
),
lambda x: x['A'].any() and x['B'].any() and x['C'].any(),
random=Random(0),
)
assert len(x['A']) == 1
assert x['A'][0] == 1
assert x['B'][0] == 1
assert x['C'][0] == 1
@st.composite
def column_strategy(draw):
name = draw(st.none() | st.text())
dtype = draw(npst.scalar_dtypes().filter(supported_by_pandas))
pass_dtype = not draw(st.booleans())
if pass_dtype:
pass_elements = not draw(st.booleans())
else:
pass_elements = True
if pass_elements:
elements = npst.from_dtype(dtype)
else:
elements = None
unique = draw(st.booleans())
fill = st.nothing() if draw(st.booleans()) else None
return pdst.column(
name=name, dtype=dtype, unique=unique, fill=fill, elements=elements)
@given(pdst.data_frames(pdst.columns(1, dtype=np.dtype('<M8[ns]'))))
def test_data_frames_with_timestamp_columns(df):
pass
@given(pdst.data_frames(pdst.columns(
['A'], dtype=float, fill=st.just(float('nan')), unique=True
)))
def test_unique_column_with_fill(df):
assert len(set(df['A'])) == len(df['A'])
@given(st.data())
def test_arbitrary_data_frames(data):
columns = data.draw(st.lists(
column_strategy(),
unique_by=lambda c: c.name if c.name is not None else float('nan')
))
try:
df = data.draw(pdst.data_frames(columns))
except Exception as e:
if type(e).__name__ == 'OutOfBoundsDatetime':
# See https://github.com/HypothesisWorks/hypothesis-python/pull/826
reject()
else:
raise
data_frame_columns = list(df)
assert len(data_frame_columns) == len(columns)
for i, (c, n) in enumerate(zip(columns, df)):
if c.name is None:
assert n == i
else:
assert c.name == n
for i, c in enumerate(columns):
column_name = data_frame_columns[i]
values = df[column_name]
if c.unique:
assert len(set(values)) == len(values)
@given(pdst.data_frames(
pdst.columns(['A'], unique=True, dtype=int),
rows=st.tuples(st.integers(0, 10)),
))
def test_can_specify_unique_with_rows(df):
column = df['A']
assert len(set(column)) == len(column)
def test_uniqueness_does_not_affect_other_rows_1():
data_frames = pdst.data_frames([
pdst.column('A', dtype=int, unique=True),
pdst.column('B', dtype=int, unique=False)],
rows=st.tuples(st.integers(0, 10), st.integers(0, 10)),
index=pdst.range_indexes(2, 2)
)
find_any(data_frames, lambda x: x['B'][0] == x['B'][1])
def test_uniqueness_does_not_affect_other_rows_2():
data_frames = pdst.data_frames([
pdst.column('A', dtype=int, unique=False),
pdst.column('B', dtype=int, unique=True)],
rows=st.tuples(st.integers(0, 10), st.integers(0, 10)),
index=pdst.range_indexes(2, 2)
)
find_any(data_frames, lambda x: x['A'][0] == x['A'][1])
@given(pdst.data_frames(
pdst.columns(['A'], dtype=int, fill=st.just(7)),
rows=st.tuples()
))
def test_will_fill_missing_columns_in_tuple_row(df):
for d in df['A']:
assert d == 7