In [41]:
import pymongo
import uuid
import json
from datetime import datetime, timezone
from cloudevents.http import CloudEvent
from cloudevents.conversion import to_structured, to_json, to_binary, to_dict

In [42]:
myclient = pymongo.MongoClient("mongodb://localhost:27017/")

In [43]:
# create database
myclient.drop_database("testdb")
mydb = myclient["testdb"]


In [44]:
# create collections
error_store = mydb["errors"]
dlq_store = mydb["dlq"]
events_store = mydb["events"]

In [45]:
attributes = {
    "id": str(uuid.uuid4()),
    "source": "https://example.com/event-producer",
    "type": "com.example.someevent",
    "time": datetime.now(timezone.utc).isoformat(),
    "specversion": "1.0"
}

data = {"hello": "world"}

test_cloud_event = CloudEvent(attributes=attributes, data=data)

In [46]:
attributes2 = {
    "id": str(uuid.uuid4()),
    "source": "https://example.com/event-producer",
    "type": "com.example.someevent",
    "time": datetime.now(timezone.utc).isoformat(),
    "specversion": "1.0"
}

data2 = {"hello": "world"}

test_cloud_event2 = CloudEvent(attributes=attributes2, data=data2)

In [47]:
to_json(test_cloud_event)

b'{"specversion": "1.0", "id": "9159e22f-55c8-46aa-9a16-bf644afd449d", "source": "https://example.com/event-producer", "type": "com.example.someevent", "time": "2025-01-07T03:56:46.100935+00:00", "data": {"hello": "world"}}'

In [48]:
to_structured(test_cloud_event)

({'content-type': 'application/cloudevents+json'},
 b'{"specversion": "1.0", "id": "9159e22f-55c8-46aa-9a16-bf644afd449d", "source": "https://example.com/event-producer", "type": "com.example.someevent", "time": "2025-01-07T03:56:46.100935+00:00", "data": {"hello": "world"}}')

In [49]:
to_binary(test_cloud_event)

({'ce-specversion': '1.0',
  'ce-id': '9159e22f-55c8-46aa-9a16-bf644afd449d',
  'ce-source': 'https://example.com/event-producer',
  'ce-type': 'com.example.someevent',
  'ce-time': '2025-01-07T03:56:46.100935+00:00'},
 b'{"hello": "world"}')

In [50]:
to_dict(test_cloud_event)

{'id': '9159e22f-55c8-46aa-9a16-bf644afd449d',
 'source': 'https://example.com/event-producer',
 'type': 'com.example.someevent',
 'time': '2025-01-07T03:56:46.100935+00:00',
 'specversion': '1.0',
 'data': {'hello': 'world'}}

In [51]:
dlq_store.insert_one(to_dict(test_cloud_event))

InsertOneResult(ObjectId('677ca5fed707548e3a212ecf'), acknowledged=True)

In [52]:
dlq_store.insert_one(to_dict(test_cloud_event2))

InsertOneResult(ObjectId('677ca5fed707548e3a212ed0'), acknowledged=True)

In [53]:
[x for x in dlq_store.find({})]

[{'_id': ObjectId('677ca5fed707548e3a212ecf'),
  'id': '9159e22f-55c8-46aa-9a16-bf644afd449d',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.100935+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}},
 {'_id': ObjectId('677ca5fed707548e3a212ed0'),
  'id': '449c4eca-e4b1-43b8-878f-8c06348d92b8',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.104776+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}}]

In [57]:
[x for x in dlq_store.find({"id": "9159e22f-55c8-46aa-9a16-bf644afd449d"})]

[{'_id': ObjectId('677ca5fed707548e3a212ecf'),
  'id': '9159e22f-55c8-46aa-9a16-bf644afd449d',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.100935+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}}]

In [58]:
[x for x in dlq_store.find({"id": "449c4eca-e4b1-43b8-878f-8c06348d92b8"})]

[{'_id': ObjectId('677ca5fed707548e3a212ed0'),
  'id': '449c4eca-e4b1-43b8-878f-8c06348d92b8',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.104776+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}}]

In [62]:
[x for x in dlq_store.find({"data": {"hello": "world"}})]

[{'_id': ObjectId('677ca5fed707548e3a212ed0'),
  'id': '449c4eca-e4b1-43b8-878f-8c06348d92b8',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.104776+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}}]

In [64]:
[x for x in dlq_store.find({"data": {"hello": "world"}, "id": "449c4eca-e4b1-43b8-878f-8c06348d92b8"})]

[{'_id': ObjectId('677ca5fed707548e3a212ed0'),
  'id': '449c4eca-e4b1-43b8-878f-8c06348d92b8',
  'source': 'https://example.com/event-producer',
  'type': 'com.example.someevent',
  'time': '2025-01-07T03:56:46.104776+00:00',
  'specversion': '1.0',
  'data': {'hello': 'world'}}]