-
Notifications
You must be signed in to change notification settings - Fork 233
/
test_user_defined_functions.py
82 lines (61 loc) · 2.19 KB
/
test_user_defined_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from optimus.tests.base import TestBase
class TestUDFPandas(TestBase):
load = {"path": "examples/data/DCIGNP2AYL.txt", "n_rows": 20}
def test_udf(self):
from optimus.functions import F
df = self.df
def haversine(lat1, lon1, lat2, lon2):
MILES = 3959
lat1, lon1, lat2, lon2 = map(F.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = F.sin(dlat / 2)**2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon / 2)**2
c = 2 * F.asin(F.sqrt(a))
total_miles = MILES * c
return total_miles
df["distance"] = haversine(40.671, -73.985, df['latitude'], df['longitude'])
self.assertIn("distance", df.cols.names())
expected_dict = {"distance": [139.6071899, 139.7468976, 142.1910502, 137.275546, 139.6845834, 142.511427,
139.2236552, 137.5044141, 141.9774911, 138.9195547, 141.1156307, 137.320386,
136.9785681, 139.8973388, 142.0501961, 141.0701359, 141.6853331, 141.7333734,
139.0531985, 139.5583856]}
self.assertTrue(df["distance"].equals(expected_dict, decimal=True, assertion=True))
class TestUDFDask(TestUDFPandas):
config = {'engine': 'dask', 'n_partitions': 1}
class TestUDFPartitionDask(TestUDFPandas):
config = {'engine': 'dask', 'n_partitions': 2}
try:
import cudf # pyright: reportMissingImports=false
except:
pass
else:
class TestUDFCUDF(TestUDFPandas):
config = {'engine': 'cudf'}
try:
import dask_cudf # pyright: reportMissingImports=false
except:
pass
else:
class TestUDFDC(TestUDFPandas):
config = {'engine': 'dask_cudf', 'n_partitions': 1}
try:
import dask_cudf # pyright: reportMissingImports=false
except:
pass
else:
class TestUDFPartitionDC(TestUDFPandas):
config = {'engine': 'dask_cudf', 'n_partitions': 2}
try:
import pyspark
except:
pass
else:
class TestUDFSpark(TestUDFPandas):
config = {'engine': 'spark'}
try:
import vaex
except:
pass
else:
class TestUDFVaex(TestUDFPandas):
config = {'engine': 'vaex'}