In [63]:
import hazelcast
from hazelcast.config import Config
import concurrent.futures
import time
import os

Підключення до кластеру

In [64]:
print("connection...")
config = Config()
config.cluster_members = ['localhost:5701', 'localhost:5702', 'localhost:5703']  
client = hazelcast.HazelcastClient(config)
os.environ['SUDO_ASKPASS'] = '/home/user/sudo_askpass.sh'
os.system('sudo -A docker logs hz-1 | grep -Pzo \'Members {.*} \\[\\n(.*Member.*\\n)+\\]\\n\'')
print("connected :-)")

connection...
Members {size:1, ver:1} [
	Member [172.25.0.3]:5701 - aff08c14-86f7-4c84-ab26-34bd10c229a7 this
]
 Members {size:2, ver:2} [
	Member [172.25.0.3]:5701 - aff08c14-86f7-4c84-ab26-34bd10c229a7 this
	Member [172.25.0.2]:5701 - 212c805f-8b21-4655-8d00-0eded124233e
]
 Members {size:3, ver:3} [
	Member [172.25.0.3]:5701 - aff08c14-86f7-4c84-ab26-34bd10c229a7 this
	Member [172.25.0.2]:5701 - 212c805f-8b21-4655-8d00-0eded124233e
	Member [172.25.0.4]:5701 - e4ea258f-3a95-46ba-983a-a32e252ccddc
]
 connected :-)


### 1. Каунтер без блокувань

In [65]:
def inc_map_noblock(map, key, num_iterations):
    for _ in range(num_iterations):
        map.put(key, map.get(key) + 1)

map_name = 'map-1'
distributed_map = client.get_map(map_name).blocking()
key = 'counter'
distributed_map.put(key, 0)
num_threads = 10
num_iterations_per_thread = 1000
print(f"Start value: {distributed_map.get(key)}")

print("--==  started  ==--")
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [
        executor.submit(inc_map_noblock, distributed_map, key, num_iterations_per_thread)
        for _ in range(num_threads)
    ]
concurrent.futures.wait(futures)

end_time = time.time()
print("--==  ended  ==--")

final_value = distributed_map.get(key)
print(f"End value: {final_value} (expected {num_threads * num_iterations_per_thread}) {"Equal" if final_value==num_threads * num_iterations_per_thread else "Not equal"}")
print(f"Done in: {round(end_time - start_time, 3)} sec ({round((end_time - start_time)/60, 3)}) min")

Start value: 0
--==  started  ==--
--==  ended  ==--
End value: 1520 (expected 10000) Not equal
Done in: 20.355 sec (0.339) min


### 2. Каунтер з песимістичним блокуванням

In [66]:
def inc_map_pessimistic(map,key, num_iterations):
    for _ in range(num_iterations):
        map.lock(key)
        try:
            map.put(key, map.get(key) + 1)
        finally:
            map.unlock(key)

map_name = 'map-1'
distributed_map = client.get_map(map_name).blocking()
key = 'counter'
distributed_map.put(key, 0)
num_threads = 10
num_iterations_per_thread = 1000
print(f"Start value: {distributed_map.get(key)}")

print("--==  started  ==--")
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [
        executor.submit(inc_map_pessimistic, distributed_map, key, num_iterations_per_thread)
        for _ in range(num_threads)
    ]
concurrent.futures.wait(futures)

end_time = time.time()
print("--==  ended  ==--")

final_value = distributed_map.get(key)
print(f"End value: {final_value} (expected {num_threads * num_iterations_per_thread}) {"Equal" if final_value==num_threads * num_iterations_per_thread else "Not equal"}")
print(f"Done in: {round(end_time - start_time, 3)} sec ({round((end_time - start_time)/60, 3)}) min")

Start value: 0
--==  started  ==--
--==  ended  ==--
End value: 10000 (expected 10000) Equal
Done in: 392.404 sec (6.54) min


### 3. Каунтер з оптимістичним блокуванням

In [67]:
def inc_map_optimistic(distributed_map, key, num_iterations):
    for _ in range(num_iterations):
        while True:
            old_value = distributed_map.get(key)
            new_value = old_value + 1
            if (distributed_map.replace_if_same(key, old_value ,new_value)):
                break

map_name = 'map-1'
distributed_map = client.get_map(map_name).blocking()
key = 'counter'
distributed_map.put(key, 0)
num_threads = 10
num_iterations_per_thread = 1000
print(f"Start value: {distributed_map.get(key)}")

print("--==  started  ==--")
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [
        executor.submit(inc_map_optimistic, distributed_map, key, num_iterations_per_thread)
        for _ in range(num_threads)
    ]
concurrent.futures.wait(futures)

end_time = time.time()
print("--==  ended  ==--")

final_value = distributed_map.get(key)
print(f"End value: {final_value} (expected {num_threads * num_iterations_per_thread}) {"Equal" if final_value==num_threads * num_iterations_per_thread else "Not equal"}")
print(f"Done in: {round(end_time - start_time, 3)} sec ({round((end_time - start_time)/60, 3)}) min")

Start value: 0
--==  started  ==--
--==  ended  ==--
End value: 10000 (expected 10000) Equal
Done in: 78.694 sec (1.312) min


Створення кластеру з CP Sysbsystem

In [68]:
client.shutdown()
print("connection...")
config2 = Config()
config2.cluster_members = ['localhost:5704', 'localhost:5705', 'localhost:5706']  
config2.cluster_name = "test"
client2 = hazelcast.HazelcastClient(config2)
os.environ['SUDO_ASKPASS'] = '/home/user/sudo_askpass.sh'
os.system('sudo -A docker logs hz-cp-1 | grep -Pzo \'CP Group Members {.*} \\[\\n(.*CPMember{.*\\n)+\\]\\n\'')
print("connected :-)")

connection...
CP Group Members {groupId: METADATA(0), size:3, term:1, logIndex:0} [
	CPMember{uuid=b07b3a82-6726-4ae2-840d-919eee98df49, address=[172.26.0.4]:5701} - FOLLOWER this
	CPMember{uuid=e1756991-ab19-4f63-ad51-d09bd44d7a34, address=[172.26.0.3]:5701}
	CPMember{uuid=3828e161-f5bb-43e1-9b8c-84d60fc94efc, address=[172.26.0.2]:5701}
]
 CP Group Members {groupId: METADATA(0), size:3, term:1, logIndex:0} [
	CPMember{uuid=b07b3a82-6726-4ae2-840d-919eee98df49, address=[172.26.0.4]:5701} - FOLLOWER this
	CPMember{uuid=e1756991-ab19-4f63-ad51-d09bd44d7a34, address=[172.26.0.3]:5701}
	CPMember{uuid=3828e161-f5bb-43e1-9b8c-84d60fc94efc, address=[172.26.0.2]:5701} - LEADER
]
 CP Group Members {groupId: default(5933), size:3, term:1, logIndex:0} [
	CPMember{uuid=b07b3a82-6726-4ae2-840d-919eee98df49, address=[172.26.0.4]:5701} - FOLLOWER this
	CPMember{uuid=3828e161-f5bb-43e1-9b8c-84d60fc94efc, address=[172.26.0.2]:5701}
	CPMember{uuid=e1756991-ab19-4f63-ad51-d09bd44d7a34, address=[172.26.0.

### 4. Каунтер з використанням IAtomicLong

In [69]:
def inc_atomic_long(counter, num_iterations):
    for _ in range(num_iterations):
        counter.increment_and_get()

counter_name = 'counter'
counter = client2.cp_subsystem.get_atomic_long(counter_name).blocking()
    
num_threads = 10
num_iterations_per_thread = 1000
total_expected = num_threads * num_iterations_per_thread
    
counter.set(0)
print(f"Start value: {counter.get()}")
    
print("--==  started  ==--")
start_time = time.time()
    
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    futures = [
        executor.submit(inc_atomic_long, counter, num_iterations_per_thread)
        for _ in range(num_threads)
    ]
    concurrent.futures.wait(futures)
    
end_time = time.time()
print("--==  ended  ==--")
    
final_value = counter.get()
print(f"End value: {final_value} (expected {total_expected}) {'Equal' if final_value == total_expected else 'Not equal'}")
print(f"Done in: {round(end_time - start_time, 3)} sec ({round((end_time - start_time)/60, 3)} min)")
 
client.shutdown()

Start value: 0
--==  started  ==--
--==  ended  ==--
End value: 10000 (expected 10000) Equal
Done in: 9.843 sec (0.164 min)
