-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
conftest.py
141 lines (108 loc) · 4.61 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from unittest.mock import Mock
import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
@pytest.fixture(autouse=True)
def time_sleep_mock(mocker):
time_mock = mocker.patch("time.sleep", lambda x: None)
yield time_mock
@pytest.fixture(scope="module")
def bulk_catalog():
with open("unit_tests/bulk_catalog.json") as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)
@pytest.fixture(scope="module")
def rest_catalog():
with open("unit_tests/rest_catalog.json") as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)
@pytest.fixture(scope="module")
def state():
state = {"Account": {"LastModifiedDate": "2021-10-01T21:18:20.000Z"}, "Asset": {"SystemModstamp": "2021-10-02T05:08:29.000Z"}}
return state
@pytest.fixture(scope="module")
def stream_config():
"""Generates streams settings for BULK logic"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"start_date": "2010-01-18T21:18:20Z",
"is_sandbox": False,
"wait_timeout": 15,
}
@pytest.fixture(scope="module")
def stream_config_date_format():
"""Generates streams settings with `start_date` in format YYYY-MM-DD"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"start_date": "2010-01-18",
"is_sandbox": False,
"wait_timeout": 15,
}
@pytest.fixture(scope="module")
def stream_config_without_start_date():
"""Generates streams settings for REST logic without start_date"""
return {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"is_sandbox": False,
"wait_timeout": 15,
}
def _stream_api(stream_config, describe_response_data=None):
sf_object = Salesforce(**stream_config)
sf_object.login = Mock()
sf_object.access_token = Mock()
sf_object.instance_url = "https://fase-account.salesforce.com"
response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}]}
if describe_response_data:
response_data = describe_response_data
sf_object.describe = Mock(return_value=response_data)
return sf_object
@pytest.fixture(scope="module")
def stream_api(stream_config):
return _stream_api(stream_config)
@pytest.fixture(scope="module")
def stream_api_v2(stream_config):
describe_response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}, {"name": "BillingAddress", "type": "address"}]}
return _stream_api(stream_config, describe_response_data=describe_response_data)
@pytest.fixture(scope="module")
def stream_api_pk(stream_config):
describe_response_data = {"fields": [{"name": "LastModifiedDate", "type": "string"}, {"name": "Id", "type": "string"}]}
return _stream_api(stream_config, describe_response_data=describe_response_data)
@pytest.fixture(scope="module")
def stream_api_v2_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([{"name": "BillingAddress", "type": "address"}])
return _stream_api(stream_config, describe_response_data=describe_response_data)
@pytest.fixture(scope="module")
def stream_api_v2_pk_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([
{"name": "BillingAddress", "type": "address"}, {"name": "Id", "type": "string"}
])
return _stream_api(stream_config, describe_response_data=describe_response_data)
def generate_stream(stream_name, stream_config, stream_api):
return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api)[0]
def encoding_symbols_parameters():
return [(x, "ISO-8859-1", b'"\xc4"\n,"4"\n\x00,"\xca \xfc"', [{"Ä": "4"}, {"Ä": "Ê ü"}]) for x in range(1, 11)] + [
(
x,
"utf-8",
b'"\xd5\x80"\n "\xd5\xaf","\xd5\xaf"\n\x00,"\xe3\x82\x82 \xe3\x83\xa4 \xe3\x83\xa4 \xf0\x9d\x9c\xb5"',
[{"Հ": "կ"}, {"Հ": "も ヤ ヤ 𝜵"}],
)
for x in range(1, 11)
]