In [1]:
%load_ext autoreload
%autoreload 2


In [2]:
import avro
from avro_json_serializer import AvroJsonSerializer
from src.pof.producer.send_record import send_record
from src.pof.consumer.consume_record import consume_record

# Simple demo

Produce a message, consume a message

## Produce a message

In [3]:
message = {"email": "email4@email.com", 
           "firstName": "John", 
           "lastName": "Doe2"}

In [4]:
schema = avro.schema.parse(open("avro/create-user-request.avsc", "rb").read())
serializer = AvroJsonSerializer(schema)

In [5]:
sr = dict()

sr["topic"] = "create-user-request"
sr["schema_file"] = "create-user-request.avsc" 
sr["record_value"] = serializer.to_json(message) 
sr["bootstrap_servers"] = "localhost:9092"
sr["schema_registry"] = "http://localhost:8081"

In [7]:
send_record(**sr)

2021-04-13T12:23:34 DEBUG    Starting new HTTP connection (1): localhost:8081
2021-04-13T12:23:34 DEBUG    http://localhost:8081 "POST /subjects/create-user-request-value/versions HTTP/1.1" 200 8
2021-04-13T12:23:34 DEBUG    http://localhost:8081 "POST /subjects/create-user-request-key/versions HTTP/1.1" 200 8
2021-04-13T12:23:34 INFO     Successfully producing record value - {'email': 'email4@email.com', 'firstName': 'John', 'lastName': 'Doe2'} to topic - create-user-request


## Consume the message

Topic messages are consumed by consumer independently

Default consumer

In [9]:
consume_record(topic="create-user-request", 
               schema_file="create-user-request.avsc",
               bootstrap_servers="localhost:9092",
               schema_registry="http://localhost:8081")

2021-04-13T12:24:19 INFO     No new messages at this point. Try again later.


Consumer A

In [11]:
consume_record(topic="create-user-request", 
               schema_file="create-user-request.avsc",
               bootstrap_servers="localhost:9092",
               schema_registry="http://localhost:8081",
               group_id="consumer_a")

2021-04-13T12:24:56 DEBUG    Starting new HTTP connection (1): localhost:8081
2021-04-13T12:24:56 DEBUG    http://localhost:8081 "GET /schemas/ids/2 HTTP/1.1" 200 148
2021-04-13T12:24:56 DEBUG    http://localhost:8081 "GET /schemas/ids/3 HTTP/1.1" 200 23
2021-04-13T12:24:56 INFO     Successfully poll a record from Kafka topic: create-user-request, partition: 0, offset: 10
message key: 472b0b54-aeab-4cc7-b57f-c84c8a4a0742 || message value: {'email': 'email4@email.com', 'firstName': 'John', 'lastName': 'Doe2'}


## Data item

In [35]:
schema = avro.schema.parse(open("avro/dataitem.avsc", "rb").read())
serializer = AvroJsonSerializer(schema)

In [36]:
schema

<avro.schema.RecordSchema at 0x7f6e5b00f8e0>

In [37]:
# ,
#     {
#       "name": "last_status",
#       "type": "array",
#       "items" : ["string", "null"],
#       "default": []
#     }

In [49]:
payload = {
    "ref_time" : "2020-01-03T14:00:00Z",
    "latency" : "null",
    "service_version" : "1.2",
    "type" : "created",
    "pipeline" : "PIPELINE",
    "duration" : "null",
    "mission" : "null",
    "service" : "test_process",
    "id" : "2c622351-TEST-DI1",
    "event_time" : "2020-01-03T14:00:00Z",
    "ext_event" : "null",
    "mission_subtype" : "null",
    "status" : "null",
    "info" : "null"
}

In [51]:
schema = avro.schema.parse(open("avro/dataitem.avsc", "rb").read())
serializer = AvroJsonSerializer(schema)

di = dict()

di["topic"] = "dataitem-metric"
di["schema_file"] = "dataitem.avsc" 
di["record_value"] = serializer.to_json(payload)
di["bootstrap_servers"] = "localhost:9092"
di["schema_registry"] = "http://localhost:8081"
di['record_key'] = payload["id"]

send_record(**di)

2021-04-13T12:59:57 DEBUG    Starting new HTTP connection (1): localhost:8081
2021-04-13T12:59:57 DEBUG    http://localhost:8081 "POST /subjects/dataitem-metric-value/versions HTTP/1.1" 200 8
2021-04-13T12:59:57 DEBUG    http://localhost:8081 "POST /subjects/dataitem-metric-key/versions HTTP/1.1" 200 8
2021-04-13T12:59:57 INFO     Successfully producing record value - {'ref_time': '2020-01-03T14:00:00Z', 'latency': 'null', 'service_version': '1.2', 'type': 'created', 'pipeline': 'PIPELINE', 'duration': 'null', 'mission': 'null', 'service': 'test_process', 'id': '2c622351-TEST-DI1', 'event_time': '2020-01-03T14:00:00Z', 'ext_event': 'null', 'mission_subtype': 'null', 'status': 'null', 'info': 'null'} to topic - dataitem-metric
