In [1]:
import hazelcast
import time

hz = hazelcast.HazelcastClient(cluster_name='lab1')

In [2]:
map = hz.get_map('my-distributed-map').blocking() 
for key in range(1000): map.set(key, key)

In [None]:
import multiprocessing
import hazelcast
import time

def increment_without_lock(iterations=10000, batch_size=10):
    """
    Кожен процес локально інкрементує значення batch_size разів,
    після чого робить один мережевий запит get/put.
    """
    client = hazelcast.HazelcastClient()
    distributed_map = client.get_map("distributed-map").blocking()
    
    # Ініціалізація ключа "key", якщо він ще не встановлений
    if distributed_map.get("key") is None:
        distributed_map.put("key", 0)

    local_increments = 0
    for i in range(iterations):
        local_increments += 1

        # Кожні batch_size ітерацій оновлюємо значення у Hazelcast
        if (i + 1) % batch_size == 0:
            current_val = distributed_map.get("key")
            new_val = current_val + local_increments
            distributed_map.put("key", new_val)
            local_increments = 0

    # Записуємо залишок (якщо є)
    if local_increments > 0:
        current_val = distributed_map.get("key")
        new_val = current_val + local_increments
        distributed_map.put("key", new_val)

    client.shutdown()


if __name__ == "__main__":
    processes = []
    start_time = time.time()
    
    # Запуск 3 процесів, кожен робить 10_000 інкрементів
    for i in range(3):
        p = multiprocessing.Process(
            target=increment_without_lock,
            name=f"P{i+1}",
            args=(10000, 10)  # 10_000 ітерацій, оновлення кожні 10
        )
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    # Підключаємося окремо для виведення фінального значення
    client = hazelcast.HazelcastClient()
    final_value = client.get_map("distributed-map").blocking().get("key")
    elapsed = time.time() - start_time
    print(f"Фінальне значення ключа 'key' (без блокувань): {final_value}. Час виконання: {elapsed:.2f} с.")
    client.shutdown()


In [None]:
import multiprocessing
import hazelcast
import time

def increment_with_pessimistic_lock(iterations=10000):
    client = hazelcast.HazelcastClient()
    distributed_map = client.get_map("distributed-map").blocking()
    lock = client.get_lock("key_lock")
    
    # Ініціалізація ключа "key"
    if distributed_map.get("key") is None:
        distributed_map.put("key", 0)
    
    for i in range(iterations):
        lock.lock()
        try:
            value = distributed_map.get("key")
            new_value = value + 1
            distributed_map.put("key", new_value)
        finally:
            lock.unlock()
        
        
    client.shutdown()

if __name__ == "__main__":
    processes = []
    start_time = time.time()
    
    for i in range(3):
        p = multiprocessing.Process(
            target=increment_with_pessimistic_lock,
            name=f"P{i+1}",
            args=(10000,)
        )
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    client = hazelcast.HazelcastClient()
    final_value = client.get_map("distributed-map").blocking().get("key")
    elapsed = time.time() - start_time
    print(f"Фінальне значення ключа 'key' (песимістичне блокування): {final_value}. Час: {elapsed:.2f} с.")
    client.shutdown()


In [None]:
import multiprocessing
import hazelcast
import time

def increment_with_optimistic_lock(iterations=10000):
    client = hazelcast.HazelcastClient()
    distributed_map = client.get_map("distributed-map").blocking()
    
    # Ініціалізація ключа "key"
    if distributed_map.get("key") is None:
        distributed_map.put("key", 0)
    
    for i in range(iterations):
        while True:
            old_value = distributed_map.get("key")
            new_value = old_value + 1
            # replace_if_same повертає True, якщо заміна пройшла успішно
            if distributed_map.replace_if_same("key", old_value, new_value):
                break
        
    client.shutdown()

if __name__ == "__main__":
    processes = []
    start_time = time.time()
    
    for i in range(3):
        p = multiprocessing.Process(
            target=increment_with_optimistic_lock,
            name=f"P{i+1}",
            args=(10000,)
        )
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    client = hazelcast.HazelcastClient()
    final_value = client.get_map("distributed-map").blocking().get("key")
    elapsed = time.time() - start_time
    print(f"Фінальне значення ключа 'key' (оптимістичне блокування): {final_value}. Час: {elapsed:.2f} с.")
    client.shutdown()


In [None]:
import multiprocessing
import hazelcast
import time

def producer():
    client = hazelcast.HazelcastClient()
    queue = client.get_queue("bounded-queue").blocking()
    
    # Запис чисел від 1 до 100
    for value in range(1, 101):
        print(f"[Producer] Додаємо значення: {value}")
        queue.put(value)  # Якщо черга заповнена (максимум 10 елементів), ця операція блокуватиметься
        time.sleep(0.1)    # Невелика затримка для наочності

    # Після завершення виробництва надсилаємо "сигнали завершення" для споживачів
    print("[Producer] Надсилаємо сигнали завершення (None) для споживачів.")
    # Оскільки споживачів два, надсилаємо два None
    queue.put(None)
    queue.put(None)
    
    client.shutdown()

def consumer(consumer_id):
    client = hazelcast.HazelcastClient()
    queue = client.get_queue("bounded-queue").blocking()
    
    while True:
        item = queue.take()  # Блокується, якщо черга порожня
        if item is None:
            print(f"[Consumer {consumer_id}] Отримано сигнал завершення. Завершення роботи.")
            break
        print(f"[Consumer {consumer_id}] Отримано значення: {item}")
    
    client.shutdown()

if __name__ == "__main__":
    processes = []
    
    # Запуск процесу-продюсера
    prod_process = multiprocessing.Process(target=producer, name="Producer")
    prod_process.start()
    processes.append(prod_process)
    
    # Запуск двох процесів-споживачів
    for i in range(1, 3):
        cons_process = multiprocessing.Process(target=consumer, name=f"Consumer-{i}", args=(i,))
        cons_process.start()
        processes.append(cons_process)
    
    # Очікуємо завершення всіх процесів
    for p in processes:
        p.join()
    
    print("Усі процеси завершено. Робота Bounded Queue демонстрована успішно.")
