-
Notifications
You must be signed in to change notification settings - Fork 20
/
test_api_crud.py
115 lines (90 loc) · 3.19 KB
/
test_api_crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Integration tests which test CRUD operations on the Airbyte API.
These tests are designed to be run against a running instance of the Airbyte API.
"""
from __future__ import annotations
import os
from airbyte_api.models.shared.sourceresponse import SourceResponse
import pytest
import ulid
import airbyte as ab
from airbyte._util import api_util, api_duck_types
from airbyte_api.models.shared import SourceFaker, DestinationDevNull, DestinationDuckdb
from dotenv import dotenv_values
from airbyte.caches.duckdb import DuckDBCache
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
ENV_AIRBYTE_API_KEY = "AIRBYTE_API_KEY"
ENV_AIRBYTE_API_WORKSPACE_ID = "AIRBYTE_API_WORKSPACE_ID"
ENV_MOTHERDUCK_API_KEY = "MOTHERDUCK_API_KEY"
@pytest.fixture
def workspace_id() -> str:
return os.environ[ENV_AIRBYTE_API_WORKSPACE_ID]
@pytest.fixture
def api_root() -> str:
return CLOUD_API_ROOT
@pytest.fixture
def api_key() -> str:
dotenv_vars: dict[str, str | None] = dotenv_values()
if ENV_AIRBYTE_API_KEY in dotenv_vars:
return dotenv_vars[ENV_AIRBYTE_API_KEY]
if ENV_AIRBYTE_API_KEY not in os.environ:
raise ValueError("Please set the AIRBYTE_API_KEY environment variable.")
return os.environ[ENV_AIRBYTE_API_KEY]
@pytest.fixture
def motherduck_api_key() -> str:
dotenv_vars: dict[str, str | None] = dotenv_values()
if ENV_MOTHERDUCK_API_KEY in dotenv_vars:
return dotenv_vars[ENV_MOTHERDUCK_API_KEY]
if ENV_MOTHERDUCK_API_KEY not in os.environ:
raise ValueError("Please set the AIRBYTE_API_KEY environment variable.")
return os.environ[ENV_MOTHERDUCK_API_KEY]
def test_create_and_delete_source(
workspace_id: str,
api_root: str,
api_key: str,
) -> None:
new_resource_name = "deleteme-source-faker" + str(ulid.ULID()).lower()[-6:]
source_config = SourceFaker()
source = api_util.create_source(
name=new_resource_name,
api_root=api_root,
api_key=api_key,
workspace_id=workspace_id,
config=source_config,
)
assert source.name == new_resource_name
assert source.source_type == "faker"
assert source.source_id
api_util.delete_source(
source_id=source.source_id,
api_root=api_root,
api_key=api_key,
workspace_id=workspace_id,
)
def test_create_and_delete_destination(
workspace_id: str,
api_root: str,
api_key: str,
motherduck_api_key: str,
) -> None:
new_resource_name = "deleteme-destination-faker" + str(ulid.ULID()).lower()[-6:]
destination_config = DestinationDuckdb(
destination_path="temp_db",
motherduck_api_key=motherduck_api_key,
)
destination = api_util.create_destination(
name=new_resource_name,
api_root=api_root,
api_key=api_key,
workspace_id=workspace_id,
config=destination_config,
)
assert destination.name == new_resource_name
assert destination.destination_type == "duckdb"
assert destination.destination_id
api_util.delete_destination(
destination_id=destination.destination_id,
api_root=api_root,
api_key=api_key,
workspace_id=workspace_id,
)