# Apache Zookeeper

Координационный сервис для распределенных приложений. Логически представляет из себя двевовидную структуру, наподобие файловой системы, к каждому узлу которой можно присвоить значение. 

Запускаем его в `Docker` вместе с `Kafka`:
```bash
docker run --name kafka_sandbox -v `pwd`:/course -p 2181:2181 -p 9092:9092 spotify/kafka
```

### Подключение

In [1]:
from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

### Создание элемента

In [2]:
zk.ensure_path('/node/a')
zk.set('/node/a', b'hello')

ZnodeStat(czxid=40, mzxid=151, ctime=1542385610032, mtime=1542387834974, version=4, cversion=27, aversion=0, ephemeralOwner=0, dataLength=5, numChildren=1, pzxid=82)

можно иначе

In [6]:
zk.create('/node/b', b'hello2')

'/node/b'

### Отслеживание изменений

In [7]:
def callback(p):    
    print('Event detected: ', p)
    
zk.get('/node/a', callback)
zk.set('/node/a', b'hello2')

Event detected:  WatchedEvent(type='CHANGED', state='CONNECTED', path='/node/a')


ZnodeStat(czxid=40, mzxid=157, ctime=1542385610032, mtime=1542387874709, version=6, cversion=27, aversion=0, ephemeralOwner=0, dataLength=6, numChildren=1, pzxid=82)

### Транзакции

In [8]:
with zk.transaction() as t:
    t.create('/node/c', b'c value')
    t.create('/node/d', b'd value')
zk.get('/node/c')    

(b'c value',
 ZnodeStat(czxid=45, mzxid=45, ctime=1542385662063, mtime=1542385662063, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=7, numChildren=0, pzxid=45))

### Блокировки

In [9]:
import threading
import time

def thread_func(num):
    with zk.Lock('/node/a') as lock:
        print('Thread #{} lock'.format(num))
        zk.ensure_path('/node/a/c')
        zk.set('/node/a/c', b'hello')
        time.sleep(2)
        print('Thread #{} unlock'.format(num))

for i in range(3):
    threading.Thread(target=thread_func, args=(i,)).start()

Thread #0 lock
Thread #0 unlock
Thread #1 lock
Thread #1 unlock
Thread #2 lock
Thread #2 unlock


### Election

In [10]:
def thread_func(num):
    election = zk.Election("/node")
    
    def election_func():
        print('Election won: {}'.format(num))
    
    election.run(election_func)


for i in range(3):
    threading.Thread(target=thread_func, args=(i,)).start()

Election won: 0
Election won: 1
Election won: 2


### Очереди

In [19]:
if zk.exists('/queue'):
    zk.delete('/queue', recursive=True)
    
q = zk.Queue('/queue')
q.put(b'1', 10)
q.put(b'2', 0)

In [12]:
q.get()

b'2'

### Счетчики

In [21]:
if zk.exists('/counter'):
    zk.delete('/counter', recursive=True)
    
counter = zk.Counter('/counter')
counter += 20
counter -= 5
counter.value

15

Connection dropped: socket connection broken
Transition to CONNECTING
Connection dropped: socket connection broken
Connection dropped: socket connection broken
Connection dropped: socket connection broken
Connection dropped: socket connection broken
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection error: Connection refused
Connection dropped: socket connection broken
Connection dropped: socket connection broken
Connection dropped: socket connection broken
Connection dropped: socket connection broken
