-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
test_datasource.py
88 lines (76 loc) · 2.29 KB
/
test_datasource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import annotations
import pathlib
from typing import TYPE_CHECKING, Callable, Final
import pact
import pytest
from tests.integration.cloud.rest_contracts.conftest import (
EXISTING_ORGANIZATION_ID,
ContractInteraction,
)
if TYPE_CHECKING:
from tests.integration.cloud.rest_contracts.conftest import PactBody
NON_EXISTENT_DATASOURCE_ID: Final[str] = "6ed9a340-8469-4ee2-a300-ffbe5d09b49d"
EXISTING_DATASOURCE_ID: Final[str] = "15da041b-328e-44f7-892e-2bfd1a887ef8"
POST_DATASOURCE_MIN_RESPONSE_BODY: Final[PactBody] = {
"data": pact.Like(
{
"id": pact.Format().uuid,
"attributes": {
"datasource_config": {},
},
},
)
}
GET_DATASOURCE_MIN_RESPONSE_BODY: Final[PactBody] = {
"data": pact.Like(
{
"id": pact.Format().uuid,
"type": "pandas",
"attributes": {
"datasource_config": {
"assets": [
{},
],
},
},
},
)
}
@pytest.mark.cloud
@pytest.mark.parametrize(
"contract_interaction",
[
# ContractInteraction(
# method="POST",
# request_path=pathlib.Path(
# "/",
# "organizations",
# EXISTING_ORGANIZATION_ID,
# "datasources",
# ),
# upon_receiving="a request to add a Data Source",
# given="the Data Source does not exist",
# response_status=200,
# response_body=POST_DATASOURCE_MIN_RESPONSE_BODY,
# ),
ContractInteraction(
method="GET",
request_path=pathlib.Path(
"/",
"organizations",
EXISTING_ORGANIZATION_ID,
"datasources",
EXISTING_DATASOURCE_ID,
),
upon_receiving="a request to get a Data Source",
given="the Data Source exists",
response_status=200,
response_body=GET_DATASOURCE_MIN_RESPONSE_BODY,
),
],
)
def test_datasource(
contract_interaction: ContractInteraction,
run_rest_api_pact_test: Callable[[ContractInteraction], None],
) -> None:
run_rest_api_pact_test(contract_interaction)