In [None]:
# Required Python libraries
!pip install Flask redis kafka-python

In [None]:
# utils.py
import redis
import hashlib
import json

# Configuration for the distributed cache
# In a real-world scenario, you would use a Redis Cluster or Sentinel for sharding
# For this example, we'll simulate sharding by hashing the key
REDIS_NODES = [
    {'host': 'redis-node-1', 'port': 6379},
    {'host': 'redis-node-2', 'port': 6379},
]
redis_connections = [redis.Redis(**node) for node in REDIS_NODES]

def get_redis_node(key):
    """Simple sharding function based on key hash."""
    h = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return redis_connections[h % len(redis_connections)]

def set_with_lru_cache(key, value, expiry=600):
    """Sets a value in the distributed cache with LRU eviction."""
    node = get_redis_node(key)
    # The built-in Redis keyspace is already a perfect LRU cache with maxmemory-policy.
    # We just need to set the key and an expiration.
    node.setex(key, expiry, json.dumps(value))

def get_from_lru_cache(key):
    """Retrieves a value from the cache."""
    node = get_redis_node(key)
    value = node.get(key)
    if value:
        return json.loads(value)
    return None


In [None]:
# log_ingestion_api.py
from flask import Flask, request, jsonify
from kafka import KafkaProducer
import json
import uuid

app = Flask(__name__)

# Kafka producer setup
KAFKA_BROKERS = ['kafka-broker-1:9092', 'kafka-broker-2:9092']
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.route('/ingest', methods=['POST'])
def ingest_log():
    log_event = request.get_json()
    if not log_event:
        return jsonify({"error": "Invalid request"}), 400

    # Add a unique ID to the event
    log_event['event_id'] = str(uuid.uuid4())
    
    # Send the log event to the 'logs' topic
    producer.send('logs', log_event)
    
    return jsonify({"status": "success", "message": "Log event queued"}), 202

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

In [None]:
# log_worker.py
from kafka import KafkaConsumer
import json
import time
from utils import set_with_lru_cache

# Mock a database interaction for a sharded B+ tree
# In a real system, this would be an ORM call to a database like PostgreSQL
def write_to_database(log_event):
    """Simulates writing to a database shard."""
    # Logic to select the correct DB shard based on a key (e.g., event_id or timestamp)
    print(f"Writing log event to DB: {log_event['event_id']}")
    # This is where your B+ tree index is implicitly used by the DB
    return True

# Kafka consumer setup
KAFKA_BROKERS = ['kafka-broker-1:9092', 'kafka-broker-2:9092']
consumer = KafkaConsumer(
    'logs',
    bootstrap_servers=KAFKA_BROKERS,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Log worker listening for events...")
for message in consumer:
    log_event = message.value
    
    # Write to the B+ tree-indexed database
    if write_to_database(log_event):
        # Populate the distributed LRU cache for recent logs
        cache_key = f"log:{log_event['event_id']}"
        set_with_lru_cache(cache_key, log_event, expiry=3600)
        print(f"Processed and cached log {log_event['event_id']}")

In [None]:
# query_service.py
from flask import Flask, request, jsonify
from utils import get_from_lru_cache
import json

app = Flask(__name__)

# Mock a database read function
# In a real system, this would query your sharded database
def query_database_for_log(log_id):
    """Simulates a DB query. Returns a log or None."""
    print(f"Cache miss! Querying database for log ID: {log_id}")
    # This would perform an indexed lookup in your sharded B+ tree
    # For a real system, you might use an ORM like SQLAlchemy
    return {"event_id": log_id, "message": "This is a log from the database", "timestamp": "..."}

@app.route('/query/<log_id>', methods=['GET'])
def get_log(log_id):
    # Step 1: Check the distributed LRU cache
    log_event = get_from_lru_cache(f"log:{log_id}")
    
    if log_event:
        print("Cache hit!")
        return jsonify(log_event), 200
    
    # Step 2: Cache miss, query the database
    log_event = query_database_for_log(log_id)
    
    if log_event:
        # Optionally, write the retrieved data back to the cache for future requests
        # set_with_lru_cache(f"log:{log_id}", log_event)
        return jsonify(log_event), 200
    
    return jsonify({"error": "Log not found"}), 404

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

In [None]:
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "your_service.py"]

In [None]:
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    depends_on:
      - zookeeper

  redis-node-1:
    image: redis:6.2.6
    container_name: redis-node-1

  redis-node-2:
    image: redis:6.2.6
    container_name: redis-node-2
    
  log-ingestion-api:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: log-ingestion-api
    ports:
      - "5000:5000"
    environment:
      - KAFKA_BROKERS=kafka:29092
    depends_on:
      - kafka

  log-worker:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: log-worker
    environment:
      - KAFKA_BROKERS=kafka:29092
      - REDIS_NODES=redis-node-1,redis-node-2
    depends_on:
      - kafka
      - redis-node-1
      - redis-node-2

  query-service:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: query-service
    ports:
      - "5001:5001"
    environment:
      - REDIS_NODES=redis-node-1,redis-node-2
    depends_on:
      - redis-node-1
      - redis-node-2