-
-
Notifications
You must be signed in to change notification settings - Fork 256
/
test_partial.py
118 lines (86 loc) · 3.41 KB
/
test_partial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pytest
from dask.delayed import Delayed
from sklearn.base import clone
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier
import dask_ml.feature_extraction.text
from dask_ml._partial import fit, predict
from dask_ml.datasets import make_classification
from dask_ml.wrappers import Incremental
x = np.array([[1, 0], [2, 0], [3, 0], [4, 0], [0, 1], [0, 2], [3, 3], [4, 4]])
y = np.array([1, 1, 1, 1, -1, -1, 0, 0])
z = np.array([[1, -1], [-1, 1], [10, -10], [-10, 10]])
X = da.from_array(x, chunks=(3, 2))
Y = da.from_array(y, chunks=(3,))
Z = da.from_array(z, chunks=(2, 2))
def test_fit():
with dask.config.set(scheduler="single-threaded"):
sgd = SGDClassifier(max_iter=5, tol=1e-3)
sgd = fit(sgd, X, Y, classes=np.array([-1, 0, 1]))
sol = sgd.predict(z)
result = predict(sgd, Z)
assert result.chunks == ((2, 2),)
assert result.compute().tolist() == sol.tolist()
def test_no_compute():
sgd = SGDClassifier(max_iter=5, tol=1e-3)
result = fit(sgd, X, Y, classes=np.array([-1, 0, 1]), compute=False)
assert isinstance(result, Delayed)
def test_fit_rechunking():
n_classes = 2
X, y = make_classification(chunks=20, n_classes=n_classes)
X = X.rechunk({1: 10})
assert X.numblocks[1] > 1
clf = Incremental(SGDClassifier(max_iter=5, tol=1e-3))
clf.fit(X, y, classes=list(range(n_classes)))
def test_fit_shuffle_blocks():
N = 10
X = da.from_array(1 + np.arange(N).reshape(-1, 1), chunks=1)
y = da.from_array(np.ones(N), chunks=1)
classes = [0, 1]
sgd = SGDClassifier(
max_iter=5, random_state=0, fit_intercept=False, shuffle=False, tol=1e-3
)
sgd1 = fit(clone(sgd), X, y, random_state=0, classes=classes)
sgd2 = fit(clone(sgd), X, y, random_state=42, classes=classes)
assert len(sgd1.coef_) == len(sgd2.coef_) == 1
assert not np.allclose(sgd1.coef_, sgd2.coef_)
X, y = make_classification(random_state=0, chunks=20)
sgd_a = fit(clone(sgd), X, y, random_state=0, classes=classes, shuffle_blocks=False)
sgd_b = fit(
clone(sgd), X, y, random_state=42, classes=classes, shuffle_blocks=False
)
assert np.allclose(sgd_a.coef_, sgd_b.coef_)
with pytest.raises(ValueError, match="cannot be used to seed"):
fit(
sgd,
X,
y,
classes=np.array([-1, 0, 1]),
shuffle_blocks=True,
random_state=da.random.RandomState(42),
)
def test_dataframes():
df = pd.DataFrame({"x": range(10), "y": [0, 1] * 5})
ddf = dd.from_pandas(df, npartitions=2)
with dask.config.set(scheduler="single-threaded"):
sgd = SGDClassifier(max_iter=5, tol=1e-3)
sgd = fit(sgd, ddf[["x"]], ddf.y, classes=[0, 1])
sol = sgd.predict(df[["x"]])
result = predict(sgd, ddf[["x"]])
da.utils.assert_eq(sol, result)
def test_bag():
x = db.from_sequence(range(10), npartitions=2)
vect = dask_ml.feature_extraction.text.HashingVectorizer()
vect = fit(vect, x, None)
y = vect.transform(x)
assert y.shape[1] == vect.n_features
def test_no_partial_fit_raises():
X, y = make_classification(chunks=50)
with pytest.raises(ValueError, match="RandomForestClassifier"):
fit(RandomForestClassifier(), X, y)