-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
test_sqlite.py
103 lines (85 loc) · 3.73 KB
/
test_sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import sqlite3
import tempfile
from contextlib import closing
import pytest
from prefect import Flow
from prefect.tasks.database import SQLiteQuery, SQLiteScript
from prefect.utilities.debug import raise_on_exception
sql_script = """
CREATE TABLE TEST (NUMBER INTEGER, DATA TEXT);
INSERT INTO TEST (NUMBER, DATA) VALUES
(11, 'first'),
(12, 'second'),
(13, 'third');
"""
@pytest.fixture(scope="module")
def database():
with tempfile.TemporaryDirectory() as tmpdir:
tmpname = os.path.join(tmpdir, "test.db")
with closing(sqlite3.connect(tmpname)) as conn:
with closing(conn.cursor()) as c:
c.executescript(sql_script)
conn.commit()
yield tmpname
class TestSQLiteQuery:
def test_sqlite_query_task_requires_db(self):
with pytest.raises(TypeError):
task = SQLiteQuery()
def test_sqlite_query_task_initializes_and_runs_basic_query(self, database):
with Flow(name="test") as f:
task = SQLiteQuery(db=database)(query="SELECT * FROM TEST")
out = f.run()
assert out.is_successful()
assert out.result[task].result == [(11, "first"), (12, "second"), (13, "third")]
def test_sqlite_query_task_initializes_with_query_and_runs(self, database):
with Flow(name="test") as f:
task = SQLiteQuery(db=database, query="SELECT * FROM TEST")()
out = f.run()
assert out.is_successful()
assert out.result[task].result == [(11, "first"), (12, "second"), (13, "third")]
def test_sqlite_error_results_in_failed_state(self, database):
with Flow(name="test") as f:
task = SQLiteQuery(db=database, query="SELECT * FROM FOOBAR")()
out = f.run()
assert out.is_failed()
assert "no such table: FOOBAR" in str(out.result[task].result)
def test_only_single_statement_queries_allowed(self, database):
query = """INSERT INTO TEST (NUMBER, DATA) VALUES\n(88, "other");\nSELECT * FROM TEST;"""
with Flow(name="test") as f:
task = SQLiteQuery(db=database, query=query)()
out = f.run()
assert out.is_failed()
assert "one statement at a time" in str(out.result[task].result)
class TestSQLiteScript:
def test_sqlite_script_task_requires_db(self):
with pytest.raises(TypeError):
task = SQLiteScript()
def test_sqlite_script_task_initializes_and_runs_basic_script(self, database):
with Flow(name="test") as f:
task = SQLiteScript(db=database)(script="SELECT * FROM TEST;")
out = f.run()
assert out.is_successful()
assert out.result[task].result is None
def test_sqlite_script_task_initializes_with_script_and_runs(self, database):
with Flow(name="test") as f:
task = SQLiteScript(db=database, script="SELECT * FROM TEST;")()
out = f.run()
assert out.is_successful()
assert out.result[task].result is None
def test_sqlite_error_results_in_failed_state(self, database):
with Flow(name="test") as f:
task = SQLiteScript(db=database)(script="SELECT * FROM FOOBAR;")
out = f.run()
assert out.is_failed()
assert "no such table: FOOBAR" in str(out.result[task].result)
def test_composition_of_tasks(database):
script = """CREATE TABLE TEST2 (NUM INTEGER, DATA TEXT); INSERT INTO TEST2 (NUM, DATA) VALUES\n(88, "other"); ALTER TABLE TEST2\n ADD status TEXT;"""
with Flow(name="test") as f:
alter = SQLiteScript(db=database)(script=script)
task = SQLiteQuery(db=database, query="SELECT * FROM TEST2")(
upstream_tasks=[alter]
)
out = f.run()
assert out.is_successful()
assert out.result[task].result == [(88, "other", None)]