In [9]:
import random
import uuid

import structlog
from confluent_kafka import avro
from kafkian import Producer
from kafkian.serde.avroserdebase import AvroRecord
from kafkian.serde.serialization import AvroSerializer, AvroStringKeySerializer
from time import sleep

In [10]:
logger = structlog.getLogger(__name__)


## Initialize Schema-Registry

In [11]:
value_schema_str = """
{
   "namespace": "locations",
   "name": "LocationReceived",
   "type": "record",
   "fields" : [
     {
       "name" : "deviceId",
       "type" : "string"
     },
     {
       "name" : "latitude",
       "type" : "float"
     },
     {
       "name" : "longitude",
       "type" : "float"
     }
   ]
}
"""

class LocationReceived(AvroRecord):
    _schema = avro.loads(value_schema_str)

In [12]:
SCHEMA_REGISTRY_CONFIG = {
    'KAFKA_BOOTSTRAP_SERVERS': 'localhost:29092',
    'SCHEMA_REGISTRY_URL': 'http://localhost:8081'
}

PRODUCER_CONFIG = {
    'bootstrap.servers': SCHEMA_REGISTRY_CONFIG.get('KAFKA_BOOTSTRAP_SERVERS')
}

producer = Producer(
    PRODUCER_CONFIG,
    key_serializer=AvroStringKeySerializer(SCHEMA_REGISTRY_CONFIG.get('SCHEMA_REGISTRY_URL')),
    value_serializer=AvroSerializer(SCHEMA_REGISTRY_CONFIG.get('SCHEMA_REGISTRY_URL'))
)

2019-05-27 22:14.51 Initializing producer          config={'acks': 'all', 'api.version.request': True, 'client.id': 'Allars-MBP-2', 'log.connection.close': False, 'max.in.flight': 1, 'queue.buffering.max.ms': 100, 'statistics.interval.ms': 15000, 'bootstrap.servers': 'localhost:29092'}


In [13]:
def produce_location_received(device_id: str, latitude: float, longitude: float):
    message = LocationReceived(dict(
        deviceId=device_id,
        latitude=latitude,
        longitude=longitude
    ))

    logger.msg("Sending to {} {} message: {}".format(SCHEMA_REGISTRY_CONFIG.get('KAFKA_BOOTSTRAP_SERVERS'),
                                                     SCHEMA_REGISTRY_CONFIG.get('SCHEMA_REGISTRY_URL'),
                                                     message))
    try:
        producer.produce('location_ingress', device_id, message, sync=True)
    except Exception as e:
        logger.exception("Failed to produce LocationReceived event: {}".format(e))

## Produce message

In [16]:
devices = [str(uuid.uuid4()) for _ in range(3)]

device = random.choice(devices)

produce_location_received(
    device,
    59.3363 + random.random() / 100,
    18.0262 + random.random() / 100
)

2019-05-27 22:16.44 Sending to localhost:29092 http://localhost:8081 message: {'deviceId': '3b8fe5fa-1746-40fe-a9c0-38d0496de559', 'latitude': 59.339148965612516, 'longitude': 18.033505655898207}
2019-05-27 22:16.44 Flushing producer
