-
Notifications
You must be signed in to change notification settings - Fork 13.6k
/
test_azure_cosmos.py
73 lines (61 loc) · 2.71 KB
/
test_azure_cosmos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import json
import unittest
import uuid
import mock
from airflow.models import Connection
from airflow.providers.microsoft.azure.operators.azure_cosmos import AzureCosmosInsertDocumentOperator
from airflow.utils import db
class TestAzureCosmosDbHook(unittest.TestCase):
# Set up an environment to test with
def setUp(self):
# set up some test variables
self.test_end_point = 'https://test_endpoint:443'
self.test_master_key = 'magic_test_key'
self.test_database_name = 'test_database_name'
self.test_collection_name = 'test_collection_name'
db.merge_conn(
Connection(
conn_id='azure_cosmos_test_key_id',
conn_type='azure_cosmos',
login=self.test_end_point,
password=self.test_master_key,
extra=json.dumps({'database_name': self.test_database_name,
'collection_name': self.test_collection_name})
)
)
@mock.patch('airflow.providers.microsoft.azure.hooks.azure_cosmos.CosmosClient')
def test_insert_document(self, cosmos_mock):
test_id = str(uuid.uuid4())
cosmos_mock.return_value.CreateItem.return_value = {'id': test_id}
op = AzureCosmosInsertDocumentOperator(
database_name=self.test_database_name,
collection_name=self.test_collection_name,
document={'id': test_id, 'data': 'sometestdata'},
azure_cosmos_conn_id='azure_cosmos_test_key_id',
task_id='azure_cosmos_sensor')
expected_calls = [mock.call().CreateItem(
'dbs/' + self.test_database_name + '/colls/' + self.test_collection_name,
{'data': 'sometestdata', 'id': test_id})]
op.execute(None)
cosmos_mock.assert_any_call(self.test_end_point, {'masterKey': self.test_master_key})
cosmos_mock.assert_has_calls(expected_calls)
if __name__ == '__main__':
unittest.main()