In [1]:
import hazelcast
import time
import os
import random
import multiprocessing

In [2]:
cluster_settings = {
    'cluster_name': 'dev', 
    'cluster_members': [
        'hazelcast_node_1:5701',
        'hazelcast_node_2:5701',
        'hazelcast_node_3:5701',
    ]
}


#### Task 1

Install and set up Hazelcast.

*Docker will be used.*


#### Task 2

Configure 3-nodes Hazelcast cluster.

*Hazelcast configured into 3-nodes cluster with docker and docker-compose*

Files:
- docker-compose.yml
- hazelcast.yml

#### Task 3

Demonstrate Distributed Map operations

In [3]:
client = hazelcast.HazelcastClient(**cluster_settings)

Create Distributed Map using API

In [4]:
map = client.get_map("my-distributed-map").blocking()

Put 1000 values into distributed map using keys from 1 to 1000

In [5]:
for i in range(1, 1001):
    map.put(i, i**2)

Check sharding of values between nodes using Management Center

![sharding_between_nodes](screenshots/management_centre_dashboard_3_nodes.PNG)
![records_distribution](screenshots/management_centre_records_distribution_3_nodes.PNG)

Check sharding change due to absence on one/two nodes. Is there data loss?

![sharding_between_nodes](screenshots/management_centre_dashboard_2_nodes.PNG)
![records_distribution](screenshots/management_centre_records_distribution_2_nodes.PNG)
![sharding_between_nodes](screenshots/management_centre_dashboard_1_node.PNG)
![records_distribution](screenshots/management_centre_records_distribution_1_node.PNG)

#### Task 4

Demonstrate Distributed Map operations with Locks


using 3 connections simultaniuosly run examples in a loop (http://docs.hazelcast.org/docs/latest/manual/html-single/index.html#locking-maps ):

a) no locking

In [6]:
def no_locking(cluster_settings):
    client = hazelcast.HazelcastClient(**cluster_settings)
    map = client.get_map('no_locking_map').blocking()
    pid = os.getpid()
    start_time = time.time()
    key = 1
    print(f'Starting. PID: {pid}')
    for k in range(0, 100):
        if k % 10 == 0:
            print(f'At: {k}. PID: {pid}')
        value = map.get(key)
        time.sleep(5)
        new_value = value + 1
        map.put(key, new_value)
    end_time = time.time()
    print('Finished! Result = {}. Processing took {:.2f} seconds. PID: {}'.format(
          map.get(key), 
          end_time - start_time,
          pid
         ))
    client.shutdown()

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map("no_locking_map").blocking()
map.put(1, 500)
client.shutdown()    
    
processes = list()

for i in range(3):
    process = multiprocessing.Process(target=no_locking, args=(cluster_settings,))
    processes.append(process)

print(f'Starting {len(processes)} processes!')

start_time = time.time()
    
for process in processes:
    time.sleep(1)
    process.start()
    
print('All processes are started!')

for process in processes:
    process.join()
    
end_time = time.time()

print('All processes are finished. Processing took {:.2f} seconds.'.format(end_time - start_time))

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map('no_locking_map').blocking()
result = map.get(1)
client.shutdown()

print(f'After all gets and puts record is (1: {result})')

Starting 3 processes!
Starting. PID: 464
At: 0. PID: 464
Starting. PID: 474
At: 0. PID: 474
Starting. PID: 484
At: 0. PID: 484
All processes are started!
At: 10. PID: 464
At: 10. PID: 474
At: 10. PID: 484
At: 20. PID: 464
At: 20. PID: 474
At: 20. PID: 484
At: 30. PID: 464
At: 30. PID: 474
At: 30. PID: 484
At: 40. PID: 464
At: 40. PID: 474
At: 40. PID: 484
At: 50. PID: 464
At: 50. PID: 474
At: 50. PID: 484
At: 60. PID: 464
At: 60. PID: 474
At: 60. PID: 484
At: 70. PID: 464
At: 70. PID: 474
At: 70. PID: 484
At: 80. PID: 464
At: 80. PID: 474
At: 80. PID: 484
At: 90. PID: 464
At: 90. PID: 474
At: 90. PID: 484
Finished! Result = 600. Processing took 500.93 seconds. PID: 464
Finished! Result = 600. Processing took 500.93 seconds. PID: 474
Finished! Result = 600. Processing took 500.95 seconds. PID: 484
All processes are finished. Processing took 504.03 seconds.
After all gets and puts record is (1: 600)


b) pesimistic locking

In [7]:
def pesimistic_locking(cluster_settings):
    client = hazelcast.HazelcastClient(**cluster_settings)
    map = client.get_map('pesimistic_locking_map').blocking()
    pid = os.getpid()
    start_time = time.time()
    key = 1
    print(f'Starting. PID: {pid}')
    for k in range(0, 100):
        if k % 10 == 0:
            print(f'At: {k}. PID: {pid}')
        map.lock(key)
        try:
            value = map.get(key)
            time.sleep(5)
            new_value = value + 1
            map.put(key, new_value)
        finally:
            map.unlock(key)
    end_time = time.time()
    print('Finished! Result = {}. Processing took {:.2f} seconds. PID: {}'.format(
          map.get(key), 
          end_time - start_time,
          pid
         ))
    client.shutdown()

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map("pesimistic_locking_map").blocking()
map.put(1, 500)
client.shutdown()    
    
processes = list()

for i in range(3):
    process = multiprocessing.Process(target=pesimistic_locking, args=(cluster_settings,))
    processes.append(process)

print(f'Starting {len(processes)} processes!')

start_time = time.time()

for process in processes:
    process.start()
    
print('All processes are started!')

for process in processes:
    process.join()

end_time = time.time()
    
print('All processes are finished. Processing took {:.2f} seconds.'.format(end_time - start_time))

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map('pesimistic_locking_map').blocking()
result = map.get(1)
client.shutdown()

print(f'After all gets and puts record is (1: {result})')

Starting 3 processes!
Starting. PID: 616
At: 0. PID: 616
Starting. PID: 618
At: 0. PID: 618
Starting. PID: 620
At: 0. PID: 620
All processes are started!
At: 10. PID: 616
At: 10. PID: 618
At: 10. PID: 620
At: 20. PID: 616
At: 20. PID: 618
At: 20. PID: 620
At: 30. PID: 616
At: 30. PID: 618
At: 30. PID: 620
At: 40. PID: 616
At: 40. PID: 618
At: 40. PID: 620
At: 50. PID: 616
At: 50. PID: 618
At: 50. PID: 620
At: 60. PID: 616
At: 60. PID: 618
At: 60. PID: 620
At: 70. PID: 616
At: 70. PID: 618
At: 70. PID: 620
At: 80. PID: 616
At: 80. PID: 618
At: 80. PID: 620
At: 90. PID: 616
At: 90. PID: 618
At: 90. PID: 620
Finished! Result = 798. Processing took 1493.42 seconds. PID: 616
Finished! Result = 799. Processing took 1498.41 seconds. PID: 618
Finished! Result = 800. Processing took 1503.37 seconds. PID: 620
All processes are finished. Processing took 1503.52 seconds.
After all gets and puts record is (1: 800)


c) optimistic locking

In [9]:
def optimistic_locking(cluster_settings):
    client = hazelcast.HazelcastClient(**cluster_settings)
    map = client.get_map('optimistic_locking_map').blocking()
    pid = os.getpid()
    start_time = time.time()
    key = 1
    print(f'Starting. PID: {pid}')
    for k in range(0, 100):
        if k % 10 == 0:
            print(f'At: {k}. PID: {pid}')
        while True:    
            value = map.get(key)
            time.sleep(5)
            new_value = value + 1
            if map.replace_if_same(key, value, new_value): break

    end_time = time.time()
    print('Finished! Result = {}. Processing took {:.2f} seconds. PID: {}'.format(
          map.get(key), 
          end_time - start_time,
          pid
         ))
    client.shutdown()

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map("optimistic_locking_map").blocking()
map.put(1, 500)
client.shutdown()    
    
processes = list()

for i in range(3):
    process = multiprocessing.Process(target=optimistic_locking, args=(cluster_settings,))
    processes.append(process)

print(f'Starting {len(processes)} processes!')

start_time = time.time()

for process in processes:
    process.start()
    
print('All processes are started!')

for process in processes:
    process.join()

end_time = time.time()
    
print('All processes are finished. Processing took {:.2f} seconds.'.format(end_time - start_time))

client = hazelcast.HazelcastClient(**cluster_settings)
map = client.get_map('optimistic_locking_map').blocking()
result = map.get(1)
client.shutdown()

print(f'After all gets and puts record is (1: {result})')

Starting 3 processes!
Starting. PID: 769
At: 0. PID: 769
Starting. PID: 771
At: 0. PID: 771Starting. PID: 774

At: 0. PID: 774
All processes are started!
At: 10. PID: 769
At: 10. PID: 771
At: 10. PID: 774
At: 20. PID: 771
At: 20. PID: 774
At: 20. PID: 769
At: 30. PID: 771
At: 30. PID: 769
At: 30. PID: 774
At: 40. PID: 771
At: 50. PID: 771
At: 40. PID: 769
At: 50. PID: 769
At: 40. PID: 774
At: 60. PID: 769
At: 60. PID: 771
At: 50. PID: 774
At: 60. PID: 774
At: 70. PID: 771
At: 80. PID: 771
At: 70. PID: 774
At: 90. PID: 771
Finished! Result = 742. Processing took 1212.03 seconds. PID: 771
At: 70. PID: 769
At: 80. PID: 774
At: 80. PID: 769
At: 90. PID: 769
At: 90. PID: 774
Finished! Result = 790. Processing took 1452.47 seconds. PID: 769
Finished! Result = 800. Processing took 1502.50 seconds. PID: 774
All processes are finished. Processing took 1502.66 seconds.
After all gets and puts record is (1: 800)


#### Task 5
Set up Bounded queue
Using Distributed Queue (http://docs.hazelcast.org/docs/latest/manual/html-single/index.html#queue) set up Bounded queue (http://docs.hazelcast.org/docs/latest/manual/html-single/index.html#setting-a-bounded-queue)


In [6]:
client = hazelcast.HazelcastClient(**cluster_settings)
queue = client.get_queue('my_queue').blocking()
queue.clear()
print(queue.remaining_capacity())
client.shutdown()

20


In [7]:
def producer(cluster_settings):
    client = hazelcast.HazelcastClient(**cluster_settings)
    queue = client.get_queue('my_queue').blocking()
    pid = os.getpid()
    start_time = time.time()
    print(f'Producer started. PID: {pid}')
    try:
        for k in range(0, 100):
            queue.put(k)
            print(f'Producing: {k}')
            time.sleep(3)
        queue.put(-1)
    finally:
        end_time = time.time()
        print('Producer finished! Processing took {:.2f} seconds. PID: {}'.format(
              end_time - start_time,
              pid
             ))
        client.shutdown()
    
def consumer(cluster_settings):
    client = hazelcast.HazelcastClient(**cluster_settings)
    queue = client.get_queue('my_queue').blocking()
    pid = os.getpid()
    start_time = time.time()
    print(f'Producer started. PID: {pid}')
    while True:
        item = queue.take()
        print(f'Consumed: {item}')
        if item == -1:
            queue.put(-1)
            break
        time.sleep(2)
    end_time = time.time()
    print('Producer finished! Processing took {:.2f} seconds. PID: {}'.format(
          end_time - start_time,
          pid
         ))
    client.shutdown()

a) test Bounded queue by running one producer and two consumers:

In [11]:
producer_1 = multiprocessing.Process(target=producer, args=(cluster_settings,))
consumer_1 = multiprocessing.Process(target=consumer, args=(cluster_settings,))
consumer_2 = multiprocessing.Process(target=consumer, args=(cluster_settings,))

producer_1.start()
consumer_1.start()
consumer_2.start()

producer_1.join()
consumer_1.join()
consumer_2.join()

Producer started. PID: 921
Producer started. PID: 923Producer started. PID: 925

Producing: 0
Consumed: 0
Consumed: 1Producing: 1

Producing: 2Consumed: 2

Producing: 3Consumed: 3

Producing: 4Consumed: 4

Producing: 5Consumed: 5

Consumed: 6Producing: 6

Consumed: 7Producing: 7

Consumed: 8Producing: 8

Producing: 9Consumed: 9

Producing: 10Consumed: 10

Producing: 11Consumed: 11

Producing: 12Consumed: 12

Producing: 13Consumed: 13

Producing: 14Consumed: 14

Consumed: 15Producing: 15

Producing: 16Consumed: 16

Consumed: 17Producing: 17

Consumed: 18Producing: 18

Consumed: 19Producing: 19

Producing: 20Consumed: 20

Consumed: 21Producing: 21

Producing: 22Consumed: 22

Consumed: 23Producing: 23

Consumed: 24Producing: 24

Producing: 25Consumed: 25

Producing: 26Consumed: 26

Producing: 27Consumed: 27

Consumed: 28Producing: 28

Producing: 29Consumed: 29

Producing: 30Consumed: 30

Consumed: 31Producing: 31

Producing: 32Consumed: 32

Producing: 33Consumed: 33

Producing: 34Consumed

b) check behavour of Bounded queue when producer runs but no consumers:

In [8]:
producer = multiprocessing.Process(target=producer, args=(cluster_settings,))

producer.start()

producer.join()

Producer started. PID: 2038
Producing: 0
Producing: 1
Producing: 2
Producing: 3
Producing: 4
Producing: 5
Producing: 6
Producing: 7
Producing: 8
Producing: 9
Producing: 10
Producing: 11
Producing: 12
Producing: 13
Producing: 14
Producing: 15
Producing: 16
Producing: 17
Producing: 18
Producing: 19
Producer finished! Processing took 71.94 seconds. PID: 2038


  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/tmp/ipykernel_1991/228812298.py", line 9, in producer
    queue.put(k)
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 325, in f
    return result.result()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 58, in result
    self._event.wait()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 191, in wait
    self.condition.wait()
  File "/usr/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
KeyboardInterrupt


KeyboardInterrupt: 

c) check how multiple consumers will read items from Bounded queue:

In [9]:
consumer_1 = multiprocessing.Process(target=consumer, args=(cluster_settings,))
consumer_2 = multiprocessing.Process(target=consumer, args=(cluster_settings,))

consumer_1.start()
consumer_2.start()

consumer_1.join()
consumer_2.join()

Producer started. PID: 2148
Consumed: 0
Producer started. PID: 2150
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Consumed: 7
Consumed: 8
Consumed: 9
Consumed: 10
Consumed: 11
Consumed: 12
Consumed: 13
Consumed: 14
Consumed: 15
Consumed: 16
Consumed: 17
Consumed: 18
Consumed: 19


Process Process-5:
Process Process-4:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()


KeyboardInterrupt: 

  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_1991/228812298.py", line 28, in consumer
    item = queue.take()
  File "/tmp/ipykernel_1991/228812298.py", line 28, in consumer
    item = queue.take()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 325, in f
    return result.result()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 58, in result
    self._event.wait()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 325, in f
    return result.result()
  File "/usr/local/lib/python3.8/dist-packages/hazelcast/future.py", line 191, in wait
    self.condition.wait()
  File "/usr/local/lib/python3.8/dist-package

#### Task 6
Set up Distributed topic (https://docs.hazelcast.com/hazelcast/5.0/data-structures/topic) and check it's behavour:
первірте поведінку з налаштуванням globalOrderEnabled та без нього

In [10]:
def listener(message):
    print(f'Got a message: {message.message}')

a) with globalOrderEnabled:

In [11]:
client_1 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_1:5701'])
topic_1 = client_1.get_topic('my_topic_global_order').blocking()
topic_1.add_listener(listener)

client_2 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_2:5701'])
topic_2 = client_2.get_topic('my_topic_global_order').blocking()
topic_2.add_listener(listener)

client_3 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_3:5701'])
topic_3 = client_3.get_topic('my_topic_global_order').blocking()

for i in range(20):
    if bool(random.getrandbits(1)):
        topic_1.publish(f'Message_{i}')
    else:
        topic_3.publish(f'Message_{i}')
    time.sleep(5)
    
client_1.shutdown()
client_2.shutdown()
client_3.shutdown()

Got a message: Message_0
Got a message: Message_0
Got a message: Message_1
Got a message: Message_1
Got a message: Message_2Got a message: Message_2

Got a message: Message_3Got a message: Message_3

Got a message: Message_4Got a message: Message_4

Got a message: Message_5Got a message: Message_5

Got a message: Message_6Got a message: Message_6

Got a message: Message_7
Got a message: Message_7
Got a message: Message_8
Got a message: Message_8
Got a message: Message_9Got a message: Message_9

Got a message: Message_10Got a message: Message_10

Got a message: Message_11
Got a message: Message_11
Got a message: Message_12
Got a message: Message_12
Got a message: Message_13Got a message: Message_13

Got a message: Message_14
Got a message: Message_14
Got a message: Message_15Got a message: Message_15

Got a message: Message_16Got a message: Message_16

Got a message: Message_17Got a message: Message_17

Got a message: Message_18Got a message: Message_18

Got a message: Message_19Got a m

a) without globalOrderEnabled:

In [12]:
client_1 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_1:5701'])
topic_1 = client_1.get_topic('my_topic_no_global_order')
topic_1.add_listener(listener)

client_2 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_2:5701'])
topic_2 = client_2.get_topic('my_topic_no_global_order')
topic_2.add_listener(listener)

client_3 = hazelcast.HazelcastClient(cluster_name='dev', cluster_members=['hazelcast_node_3:5701'])
topic_3 = client_3.get_topic('my_topic_no_global_order')

for i in range(20):
    if bool(random.getrandbits(1)):
        topic_1.publish(f'Message_{i}')
    else:
        topic_3.publish(f'Message_{i}')
    time.sleep(5)
    
client_1.shutdown()
client_2.shutdown()
client_3.shutdown()

Got a message: Message_0
Got a message: Message_0
Got a message: Message_1Got a message: Message_1

Got a message: Message_2Got a message: Message_2

Got a message: Message_3
Got a message: Message_3
Got a message: Message_4Got a message: Message_4

Got a message: Message_5Got a message: Message_5

Got a message: Message_6Got a message: Message_6

Got a message: Message_7Got a message: Message_7

Got a message: Message_8Got a message: Message_8

Got a message: Message_9Got a message: Message_9

Got a message: Message_10Got a message: Message_10

Got a message: Message_11Got a message: Message_11

Got a message: Message_12
Got a message: Message_12
Got a message: Message_13Got a message: Message_13

Got a message: Message_14Got a message: Message_14

Got a message: Message_15Got a message: Message_15

Got a message: Message_16Got a message: Message_16

Got a message: Message_17
Got a message: Message_17
Got a message: Message_18Got a message: Message_18

Got a message: Message_19Got a m