In [2]:
# Start Kafka, Elasticsearch, Kibana
!docker compose up -d

# Show container status
!docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"

[33mWARN[0m[0000] /Users/ss/realtime-store/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion 
[1A[1B[0G[?25l[+] Running 1/3
 [32m✔[0m Network realtime-store_default  [32mCre...[0m                                  [34m0.0s [0m
 [33m⠋[0m Container es                    Creating                                [34m0.0s [0m
 [33m⠋[0m Container kafka                 Creating                                [34m0.0s [0m
[?25h[1A[1A[1A[1A[0G[?25l[+] Running 4/4
 [32m✔[0m Network realtime-store_default  [32mCre...[0m                                  [34m0.0s [0m
 [32m✔[0m Container es                    [32mCreated[0m                                 [34m0.0s [0m
 [32m✔[0m Container kafka                 [32mCreated[0m                                 [34m0.0s [0m
 [32m✔[0m Container kibana                [32mCreated[0m                                 [34m0.1s [0m
[?25h[1A

In [1]:
# Stop any old containers
!docker compose down

# Check Python + install dependencies
import platform, sys
print("Python version:", platform.python_version())
print("Python executable:", sys.executable)

%pip install --upgrade pip
%pip install confluent-kafka elasticsearch requests

[33mWARN[0m[0000] /Users/ss/realtime-store/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion 
[1A[1B[0G[?25l[+] Running 0/2
 [33m⠋[0m Container kibana  Stopping                                              [34m0.1s [0m
 [33m⠋[0m Container kafka   Stopping                                              [34m0.1s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m⠙[0m Container kibana  Stopping                                              [34m0.2s [0m
 [33m⠙[0m Container kafka   Stopping                                              [34m0.2s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m⠹[0m Container kibana  Stopping                                              [34m0.3s [0m
 [33m⠹[0m Container kafka   Stopping                                              [34m0.3s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 0/2
 [33m⠸[0m Container kibana  Stopping                                 

In [3]:
from confluent_kafka.admin import AdminClient, NewTopic

# Connect to Kafka
admin = AdminClient({"bootstrap.servers": "localhost:9092"})

# Define two topics: one for trade orders, one for market ticks
topics = [
    NewTopic("trade_orders", num_partitions=1, replication_factor=1),
    NewTopic("market_ticks", num_partitions=1, replication_factor=1),
]

# Create topics
fs = admin.create_topics(topics)

# Confirm creation
for t, f in fs.items():
    try:
        f.result()
        print(f"Topic '{t}' created successfully")
    except Exception as e:
        if "Topic already exists" in str(e):
            print(f"Topic '{t}' already exists (reusing it)")
        else:
            print(f"Failed to create topic '{t}': {e}")

Failed to create topic 'trade_orders': KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'trade_orders' already exists."}
Topic 'market_ticks' created successfully


In [6]:
# Producer A — burst 400 trade orders into Kafka

from confluent_kafka import Producer
import json, random, time, uuid

# Kafka producer config (keep idempotence False for now so it starts immediately)
producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "enable.idempotence": False,
    "acks": "all",
    "linger.ms": 5,
    "batch.size": 32768
})

# Synthetic market parameters
SYMBOLS = ["AAPL", "MSFT", "TSLA", "NVDA", "BAC", "RY", "TD", "BNS"]
SIDES   = ["BUY", "SELL"]
TYPES   = ["NEW", "CANCEL", "EXECUTE"]  # skewed below

def make_order():
    """Build one fake trade/order event."""
    return {
        "event_type": random.choices(TYPES, weights=[70, 10, 20])[0],
        "order_id": str(uuid.uuid4()),
        "symbol": random.choice(SYMBOLS),
        "side":   random.choice(SIDES),
        "price":  round(random.uniform(10, 500), 2),
        "qty":    random.choice([10, 25, 50, 100, 200]),
        "ts":     time.time(),
    }

TOPIC = "trade_orders"
N = 400  # how many to send in this burst

for i in range(N):
    evt = make_order()
    producer.produce(
        TOPIC,
        key=evt["order_id"].encode(),
        value=json.dumps(evt).encode()
    )
    producer.poll(0)           # serve delivery callbacks, keep buffers flowing
    if (i + 1) % 50 == 0:
        print(f"sent {i+1}/{N}")
    time.sleep(0.01)           # tiny pacing so logs are readable

producer.flush(10)
print("burst done")

sent 50/400
sent 100/400
sent 150/400
sent 200/400
sent 250/400
sent 300/400
sent 350/400
sent 400/400
burst done


In [None]:
from confluent_kafka import Producer

print("micro-test: producing one message...")
p = Producer({"bootstrap.servers": "localhost:9092", "enable.idempotence": False})

def cb(err, msg):
    if err:
        print("delivery error:", err)
    else:
        print(f"delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}")

p.produce("trade_orders", key=b"TEST", value=b'{"ping":1}', on_delivery=cb)
p.flush(10)   # wait up to 10s for the delivery report
print("micro-test done")

In [None]:
import requests

trade_mappings = {
  "properties": {
    "@timestamp": {"type": "date"},
    "event_type": {"type": "keyword"},
    "order_id":   {"type": "keyword"},
    "symbol":     {"type": "keyword"},
    "side":       {"type": "keyword"},
    "price":      {"type": "double"},
    "qty":        {"type": "integer"}
  }
}

tick_mappings = {
  "properties": {
    "@timestamp": {"type": "date"},
    "symbol":     {"type": "keyword"},
    "last":       {"type": "double"},
    "bid":        {"type": "double"},
    "ask":        {"type": "double"},
    "vol":        {"type": "integer"}
  }
}

def create_index_http(name, mappings):
    r = requests.put(f"http://localhost:9200/{name}", json={"mappings": mappings}, timeout=10)
    print(name, r.status_code, r.text[:200])

create_index_http("trade-orders", trade_mappings)
create_index_http("market-ticks", tick_mappings)

In [7]:
from confluent_kafka import Consumer, KafkaException
from elasticsearch import helpers
import json, datetime, requests

# Connect Elasticsearch via REST API
ES_HOST = "http://localhost:9200"

# Configure Kafka consumer
consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "py-es-consumer",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False
})

topics = ["trade_orders", "market_ticks"]
consumer.subscribe(topics)
print(f"Subscribed to: {topics}")

BATCH = 20
buf = []

def to_es_action(msg):
    topic = msg.topic()
    src = json.loads(msg.value())

    # convert timestamp if present
    if "ts" in src:
        src["@timestamp"] = datetime.datetime.utcfromtimestamp(src["ts"]).isoformat(timespec="milliseconds") + "Z"

    # choose index name
    index = "trade-orders" if topic == "trade_orders" else "market-ticks"
    doc_id = f"{topic}-{msg.partition()}-{msg.offset()}"

    return {"_index": index, "_id": doc_id, "_source": src}

try:
    print("Consuming... press stop/interrupt to end")
    while True:
        m = consumer.poll(0.5)
        if m is None:
            continue
        if m.error():
            raise KafkaException(m.error())
        buf.append(to_es_action(m))

        if len(buf) >= BATCH:
            r = requests.post(f"{ES_HOST}/_bulk", 
                              data="\n".join(json.dumps(a) for a in buf) + "\n",
                              headers={"Content-Type": "application/x-ndjson"})
            print("Flushed", len(buf), "docs ->", r.status_code)
            buf.clear()
            consumer.commit()

except KeyboardInterrupt:
    print("Stopping consumer...")

finally:
    consumer.close()
    print("Consumer closed")

Subscribed to: ['trade_orders', 'market_ticks']
Consuming... press stop/interrupt to end


  src["@timestamp"] = datetime.datetime.utcfromtimestamp(src["ts"]).isoformat(timespec="milliseconds") + "Z"


Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 docs -> 400
Flushed 20 

%4|1757510233.089|SESSTMOUT|rdkafka#consumer-4| [thrd:main]: Consumer group session timed out (in join-state steady) after 900354 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group
%4|1757512479.258|SESSTMOUT|rdkafka#consumer-4| [thrd:main]: Consumer group session timed out (in join-state steady) after 1054271 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group


Flushed 20 docs -> 400


%5|1757513275.942|REQTMOUT|rdkafka#consumer-4| [thrd:localhost:9092/1]: localhost:9092/1: Timed out FetchRequest in flight (after 704677ms, timeout #0)
%4|1757513275.942|REQTMOUT|rdkafka#consumer-4| [thrd:localhost:9092/1]: localhost:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1757513275.942|SESSTMOUT|rdkafka#consumer-4| [thrd:main]: Consumer group session timed out (in join-state steady) after 705153 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group
%4|1757513735.388|SESSTMOUT|rdkafka#consumer-4| [thrd:main]: Consumer group session timed out (in join-state steady) after 375526 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group
%5|1757513735.388|REQTMOUT|rdkafka#consumer-4| [thrd:localhost:9092/1]: localhost:9092/1: Timed out FetchRequest in flight (after 375035ms, timeout 

Stopping consumer...
Consumer closed


In [13]:
import requests, json

for idx in ["trade_orders", "market_ticks"]:
    # Count documents
    c = requests.get(f"http://localhost:9200/{idx}/_count").json()
    print(idx, "count:", c.get("count"))

    # Show one sample doc
    sample = requests.get(
        f"http://localhost:9200/{idx}/_search",
        params={"size": 1, "pretty": "true"}
    ).text
    print(sample[:500], "...\n")

trade_orders count: None
{
  "error" : {
    "root_cause" : [
      {
        "type" : "index_not_found_exception",
        "reason" : "no such index [trade_orders]",
        "resource.type" : "index_or_alias",
        "resource.id" : "trade_orders",
        "index_uuid" : "_na_",
        "index" : "trade_orders"
      }
    ],
    "type" : "index_not_found_exception",
    "reason" : "no such index [trade_orders]",
    "resource.type" : "index_or_alias",
    "resource.id" : "trade_orders",
    "index_uuid" : "_na_",
    ...

market_ticks count: None
{
  "error" : {
    "root_cause" : [
      {
        "type" : "index_not_found_exception",
        "reason" : "no such index [market_ticks]",
        "resource.type" : "index_or_alias",
        "resource.id" : "market_ticks",
        "index_uuid" : "_na_",
        "index" : "market_ticks"
      }
    ],
    "type" : "index_not_found_exception",
    "reason" : "no such index [market_ticks]",
    "resource.type" : "index_or_alias",
    "resou

In [12]:
import requests, json

doc = {
    "event_type": "TEST",
    "symbol": "AAPL",
    "price": 123.45,
    "ts": "2025-09-10T12:00:00Z"
}

r = requests.post("http://localhost:9200/trade-orders/_doc", 
                  headers={"Content-Type": "application/json"},
                  data=json.dumps(doc))

print(r.status_code, r.text)

201 {"_index":"trade-orders","_id":"1P2qNJkBELe5nx9sEYiN","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1}
