This repository has been archived by the owner on Aug 18, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
zookeeper_test.py
155 lines (133 loc) · 4.48 KB
/
zookeeper_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
# Copyright 2016 Yelp Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from __future__ import unicode_literals
import sys
import mock
import pytest
from kazoo.client import KazooClient
from kazoo.exceptions import LockTimeout
from data_pipeline.environment_configs import IS_OPEN_SOURCE_MODE
from data_pipeline.zookeeper import ZK
from data_pipeline.zookeeper import ZKLock
class TestZK(object):
@property
def fake_namespace(self):
return "test_namespace"
@property
def fake_name(self):
return "test_name"
@pytest.fixture
def zk_client(self):
return mock.Mock(autospec=KazooClient)
@pytest.yield_fixture
def patch_zk(self, zk_client):
with mock.patch.object(
ZK,
'get_kazoo_client',
return_value=zk_client
) as mock_get_kazoo:
yield mock_get_kazoo
@pytest.fixture
def mock_zk(self, patch_zk, zk_client):
return ZK()
def test_create_close(self, mock_zk, zk_client):
assert not zk_client.stop.call_count
mock_zk.close()
self._check_zk(zk_client)
def _check_zk(self, zk_client):
assert zk_client.start.call_count == 1
assert zk_client.stop.call_count == 1
assert zk_client.close.call_count == 1
class TestZKLock(TestZK):
@property
def lock_path(self):
return "/{} - {}".format(self.fake_name, self.fake_namespace)
@pytest.fixture
def bad_lock(self):
bad_lock = mock.Mock()
bad_lock.acquire = mock.Mock(side_effect=LockTimeout('Test exception'))
return bad_lock
@pytest.fixture
def locked_zk_client(self, bad_lock):
zk_client = mock.Mock(autospec=(KazooClient))
zk_client.Lock = mock.Mock(return_value=bad_lock)
return zk_client
@pytest.yield_fixture
def locked_patch_zk(self, locked_zk_client):
with mock.patch.object(
ZKLock,
'get_kazoo_client',
return_value=locked_zk_client
) as mock_get_kazoo:
yield mock_get_kazoo
@pytest.yield_fixture
def patch_exit(self):
with mock.patch.object(
sys,
'exit'
) as mock_exit:
yield mock_exit
def test_setup_lock_and_close(
self,
zk_client,
patch_zk
):
with ZKLock(self.fake_name, self.fake_namespace):
self._check_mid_lock(zk_client)
self._check_zk_lock(zk_client)
def test_lock_exception(
self,
locked_zk_client,
locked_patch_zk,
patch_exit
):
with ZKLock(self.fake_name, self.fake_namespace):
pass
assert patch_exit.call_count == 1
assert locked_zk_client.start.call_count == 1
# Since sys.exit is mocked, it calls close twice
assert locked_zk_client.stop.call_count >= 1
assert locked_zk_client.close.call_count >= 1
@pytest.mark.skipif(
IS_OPEN_SOURCE_MODE,
reason="This test is acquiring lock on local ZK instance and not on the mock."
)
def test_double_lock(
self,
patch_exit
):
# TODO (DATAPIPE-2122|abrar): this test is aqcuiring
# lock on actual zookeeper instance, needs to be fixed.
with ZKLock(self.fake_name, self.fake_namespace):
assert patch_exit.call_count == 0
with ZKLock(self.fake_name, self.fake_namespace):
pass
assert patch_exit.call_count == 1
def _check_zk_lock(self, zk_client):
super(TestZKLock, self)._check_zk(zk_client)
zk_client.Lock.assert_called_once_with(
self.lock_path,
self.fake_namespace
)
def _check_mid_lock(self, zk_client):
zk_client.Lock.assert_called_once_with(
self.lock_path,
self.fake_namespace
)
assert zk_client.start.call_count == 1
assert zk_client.stop.call_count == 0
assert zk_client.close.call_count == 0