In [59]:
import hazelcast

# Konfiguracja klienta Hazelcast
client = hazelcast.HazelcastClient(
            cluster_name = "dev",
            cluster_members=["127.0.0.1:5701", "127.0.0.1:5702", "127.0.0.1:5703"],
        )


In [60]:
from hazelcast.aggregator import count, number_avg, max_by
from hazelcast.predicate import less_or_equal

imap = client.get_map("coinbase_trades").blocking()
  
# # Pobranie danych z IMap
btc_usd_data = imap.values()
print("BTC-USD Data from Hazelcast:", btc_usd_data)



BTC-USD Data from Hazelcast: [{'type': 'ticker', 'sequence': 95434878505, 'product_id': 'BTC-USD', 'price': '94465.55', 'open_24h': '96306.97', 'volume_24h': '9796.02105156', 'low_24h': '93234.31', 'high_24h': '96379.81', 'volume_30d': '407833.68425424', 'best_bid': '94465.54', 'best_bid_size': '0.06440030', 'best_ask': '94465.55', 'best_ask_size': '0.29420024', 'side': 'buy', 'time': '2024-12-28T13:02:43.999958Z', 'trade_id': 749607269, 'last_size': '0.00000529'}, {'type': 'ticker', 'sequence': 95434966351, 'product_id': 'BTC-USD', 'price': '94411.71', 'open_24h': '96306.97', 'volume_24h': '9804.11821635', 'low_24h': '93234.31', 'high_24h': '96379.81', 'volume_30d': '407841.78141903', 'best_bid': '94411.71', 'best_bid_size': '0.00008738', 'best_ask': '94411.72', 'best_ask_size': '0.00042379', 'side': 'sell', 'time': '2024-12-28T13:05:33.945317Z', 'trade_id': 749608082, 'last_size': '0.00000953'}, {'type': 'ticker', 'sequence': 72613841371, 'product_id': 'ETH-USD', 'price': '3348.11', 

In [30]:
cities = client.get_map("cities").blocking()

client.sql.execute(
    """
CREATE MAPPING cities (
__key INT,
countries VARCHAR,
cities VARCHAR)
TYPE IMap
OPTIONS('keyFormat'='int', 'valueFormat'='json-flat')

    """
).result()

client.sql.execute(
    """
INSERT INTO cities VALUES
(1, 'United Kingdom','London'),
(2, 'United Kingdom','Manchester'),
(3, 'United States', 'New York'),
(4, 'United States', 'Los Angeles'),
(5, 'Turkey', 'Ankara'),
(6, 'Turkey', 'Istanbul'),
(7, 'Brazil', 'Sao Paulo'),
(8, 'Brazil', 'Rio de Janeiro')
    """
).result()

result = client.sql.execute("SELECT * FROM cities").result()


In [32]:
for row in result:
    print(row)

[__key INTEGER=2, countries VARCHAR=United Kingdom, cities VARCHAR=Manchester]
[__key INTEGER=7, countries VARCHAR=Brazil, cities VARCHAR=Sao Paulo]
[__key INTEGER=1, countries VARCHAR=United Kingdom, cities VARCHAR=London]
[__key INTEGER=3, countries VARCHAR=United States, cities VARCHAR=New York]
[__key INTEGER=6, countries VARCHAR=Turkey, cities VARCHAR=Istanbul]
[__key INTEGER=5, countries VARCHAR=Turkey, cities VARCHAR=Ankara]
[__key INTEGER=8, countries VARCHAR=Brazil, cities VARCHAR=Rio de Janeiro]
[__key INTEGER=4, countries VARCHAR=United States, cities VARCHAR=Los Angeles]


In [None]:
# Zamknięcie klienta Hazelcast
client.shutdown()

## SQL do Ticker

In [None]:
trades = client.get_map("coinbase_trades").blocking()

client.sql.execute(
    """

CREATE OR REPLACE MAPPING tickers
(
    __key BIGINT,
    type VARCHAR,
    product_id VARCHAR,
    price DECIMAL,
    open_24h DECIMAL,
    volume_24h DECIMAL,
    low_24h DECIMAL,
    high_24h DECIMAL,
    volume_30d DECIMAL,
    best_bid DECIMAL,
    best_bid_size DECIMAL,
    best_ask DECIMAL,
    best_ask_size DECIMAL,
    side VARCHAR,
    "time" TIMESTAMP,
    trade_id BIGINT,
    last_size DECIMAL
)
TYPE IMap
OPTIONS (
    'keyFormat' = 'bigint',          -- Format klucza (BIGINT w tym przypadku)
    'valueFormat' = 'json'           -- Format wartości (JSON dla przechowywania złożonych struktur)
);

    """
).result()

# docker-app-1         | Inserted into Hazelcast IMap: 95433858445 -> 
# {'type': 'ticker', 'sequence': 95433858445, 'product_id': 'BTC-USD', 'price': '94486.17', 'open_24h': '96567.47', 
# 'volume_24h': '9987.54055079', 'low_24h': '93234.31', 'high_24h': '96567.47', 'volume_30d': '407793.90999872', 
# 'best_bid': '94484.84', 'best_bid_size': '0.00118175', 'best_ask': '94488.83', 'best_ask_size': '0.21166524', 
# 'side': 'buy', 'time': '2024-12-28T12:27:02.299617Z', 'trade_id': 749599846, 'last_size': '0.00011641'}


HazelcastSqlError: Cannot rename field: '__key'

In [54]:
result_trades = client.sql.execute("SELECT * FROM coinbase_trades_2").result()

for row in result_trades:
    print(row)

#### Client Jet
Jet Streams to mechanizm umożliwiający obsługę strumieni danych w czasie rzeczywistym. Pozwala on na przetwarzanie strumieni danych (np. z WebSocketów) i ich przesyłanie do innych komponentów systemu, takich jak IMap w Hazelcast.

In [7]:
import hazelcast
from hazelcast.jet import JetClient

# Tworzymy klienta Hazelcast Jet
client = hazelcast.HazelcastClient(
    cluster_name="dev",  # Nazwa klastra
    network_config={
        "tcp_ip": {
            "enabled": True,
            "member_list": ["127.0.0.1:5701", "127.0.0.1:5702", "127.0.0.1:5703"]
        },
    },
)

# Tworzymy klienta Jet
jet_client = JetClient.new_client()

# Uzyskujemy dostęp do mapy
map_name = "trades"
imap = client.get_map(map_name).blocking()

# Sprawdzamy połączenie
print(f"Connected to Hazelcast Jet cluster {client.get_cluster().get_cluster_id()}")


ModuleNotFoundError: No module named 'hazelcast.jet'