Skip to content

Commit

Permalink
chore: use eventmsg-adaptor library
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianLusina committed Oct 24, 2023
1 parent 0230e8d commit a49e9aa
Show file tree
Hide file tree
Showing 19 changed files with 1,533 additions and 220 deletions.
19 changes: 0 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,6 @@ format:
lint:
pylint app

########################################################################################################################
# Generation and build commands
########################################################################################################################

# Builds the buf module
buf-build:
buf build
.PHONY: buf-build

# updates the buf module
buf-update:
buf mod update
.PHONY: buf-update

# generates protobuf messages
buf-generate:
buf generate
.PHONY: buf-generate

########################################################################################################################
# Migration commands
########################################################################################################################
Expand Down
2 changes: 1 addition & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import app.api.sms.routes
from app.api.sms.routes import router as sms_router
from app.api.monitoring.routes import router as monitoring_router
from app.config.di.container import ApplicationContainer
Expand All @@ -17,6 +16,7 @@
from app.infra.telemetry.otel.metrics import initialize_metrics
from app.infra.telemetry.otel.traces import initialize_traces
from app.infra.telemetry.prometheus import setup_prometheus_client
import app.api.sms.routes
from .settings import get_config

logging.root.setLevel(logging.INFO)
Expand Down
24 changes: 13 additions & 11 deletions app/adapters/broker/producers/sms_received_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,35 @@
from tenacity import retry, stop_after_attempt, stop_after_delay, wait_exponential
import sanctumlabs.messageschema.events.notifications.sms.v1.events_pb2 as events
import sanctumlabs.messageschema.events.notifications.sms.v1.data_pb2 as sms_data
from eventmsg_adaptor.event_streams import AsyncEventStream

from app.core.infra.producer import Producer
from app.infra.logger import log as logger
from app.domain.entities.sms import Sms
from app.infra.broker.kafka.producers import KafkaProducer
from app.infra.broker.kafka.message import ProducerMessage


class SmsReceivedProducer(Producer):
"""
SMS Received Producer handle producing SmsReceived events using a Kafka producer to Kafka Broker Cluster
"""

def __init__(self, topic: str, kafka_producer: KafkaProducer):
def __init__(self, topic: str, event_stream: AsyncEventStream):
"""
Creates an instance of an sms received producer with a topic to send events to and a KafkaProducer client to
use to send events.
Args:
topic (str): Topic to send message to
kafka_producer (KafkaProducer): Kafka Producer client to use
topic (str): Topic to send message to.
event_stream (AsyncEventStream): Kafka Producer client to use
"""
self.topic = topic
self.kafka_producer = kafka_producer
self.event_stream = event_stream

@retry(reraise=True, stop=(stop_after_attempt(3) | stop_after_delay(10)),
wait=wait_exponential(multiplier=1, min=3, max=5))
def publish_message(self, sms: Sms):
@retry(
reraise=True,
stop=(stop_after_attempt(3) | stop_after_delay(10)),
wait=wait_exponential(multiplier=1, min=3, max=5),
)
async def publish_message(self, sms: Sms):
try:
data = sms_data.Sms(
id=sms.id.value,
Expand All @@ -38,8 +41,7 @@ def publish_message(self, sms: Sms):
message=sms.message.value,
)
event = events.SmsReceived(sms=data)
message = ProducerMessage(topic=self.topic, value=event)
self.kafka_producer.produce(message=message)
await self.event_stream.publish(destination=self.topic, event_body=event)
except Exception as e:
logger.error(f"{self.producer_name}> Failed to publish message Err: {e}")
raise e
20 changes: 4 additions & 16 deletions app/api/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Base Data Transfer Objects
"""
from typing import Generic, TypeVar, Optional
from pydantic.generics import GenericModel
from pydantic import BaseModel, ConfigDict

DataT = TypeVar("DataT")

Expand All @@ -23,21 +23,15 @@ def __init__(


# pylint: disable=too-few-public-methods
class ApiResponse(GenericModel, Generic[DataT]):
class ApiResponse(BaseModel, Generic[DataT]):
"""
Represents a successful ApiResponse sent back to a client
"""

status: int = 200
data: Optional[DataT]
message: Optional[str] = None

class Config:
"""
ApiResponse Config
"""

schema_extra = {"example": {"status": 200}}
model_config = ConfigDict(json_schema_extra={"example": {"status": 200}})


# pylint: disable=too-few-public-methods
Expand All @@ -47,10 +41,4 @@ class BadRequest(ApiResponse):
"""

status: int = 400

class Config:
"""
BadRequest Config
"""

schema_extra = {"example": {"status": 400, "message": "Invalid JSON"}}
model_config = ConfigDict(json_schema_extra={"example": {"status": 400, "message": "Invalid JSON"}})
10 changes: 6 additions & 4 deletions app/api/sms/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
"""
from typing import List

from pydantic import BaseModel, validator
from pydantic import field_validator, BaseModel


class SmsRequestDto(BaseModel):
"""
SMS Request Payload DTO
"""

sender: str | None
sender: str | None = None
recipient: str
message: str

@validator("message")
@field_validator("message")
@classmethod
def message_must_be_valid(cls, m: str):
"""
Validates message
Expand All @@ -41,7 +42,8 @@ class BulkSmsRequestDto(BaseModel):
recipients: List[str]
message: str

@validator("message")
@field_validator("message")
@classmethod
def message_must_be_valid(cls, m: str):
"""
Validates message
Expand Down
6 changes: 3 additions & 3 deletions app/config/di/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dependency_injector import containers, providers
from .gateways_container import GatewaysContainer
from .kafka_container import KafkaContainer
from .event_stream_container import EventStreamContainer
from .services_container import ServicesContainer
from .repository_container import RepositoryContainer
from .domain_container import DomainContainer
Expand All @@ -11,9 +11,9 @@ class ApplicationContainer(containers.DeclarativeContainer):
Application container wiring all dependencies together
"""

gateways = providers.Container(GatewaysContainer)
kafka = providers.Container(EventStreamContainer)

kafka = providers.Container(KafkaContainer)
gateways = providers.Container(GatewaysContainer)

services = providers.Container(ServicesContainer, gateways=gateways, kafka_container=kafka)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
"""
Kafka DI container
"""
from typing import cast

from dependency_injector import containers, providers
import sanctumlabs.messageschema.events.notifications.sms.v1.events_pb2 as events
from app.infra.broker.kafka.config import KafkaSchemaRegistryConfig, KafkaProducerConfig, KafkaConsumerConfig
from eventmsg_adaptor.config.kafka import KafkaConfig, KafkaSchemaRegistryConfig, KafkaSecurityProtocolConfig
from eventmsg_adaptor.event_streams import AsyncEventStream
from eventmsg_adaptor import factory
from eventmsg_adaptor.config import Config, AdapterConfigs

from app.infra.broker.kafka.config import KafkaProducerConfig, KafkaConsumerConfig
from app.infra.broker.kafka.producers.simple_producer import KafkaSimpleProducer
from app.infra.broker.kafka.producers.proto_producer import KafkaProtoProducer
from app.infra.broker.kafka.consumers.proto_consumer import KafkaProtoConsumer
Expand All @@ -13,22 +20,36 @@
from app.settings import KafkaSettings


class KafkaContainer(containers.DeclarativeContainer):
class EventStreamContainer(containers.DeclarativeContainer):
"""
Dependency Injector container for Kafka
Dependency Injector container for event adapter
"""

config = providers.Configuration(pydantic_settings=[KafkaSettings()])
config.from_pydantic(KafkaSettings())
kafka_config = providers.Configuration(pydantic_settings=[KafkaSettings()])
# TODO: load from env
# kafka_config.from_pydantic(KafkaSettings())

config = Config(
service_name="ujumbe",
default_adapter="kafka",
adapters=AdapterConfigs(
kafka=KafkaConfig(
# bootstrap_servers=[kafka_config.kafka_bootstrap_servers()],
# schema_registy=KafkaSchemaRegistryConfig(
# schema_registry_url=kafka_config.kafka_schema_registry()
# )
)
)
)
kafka_event_stream = cast(AsyncEventStream, factory(config, adapter_name="aiokafka"))

schema_registry = providers.Singleton(
KafkaRegistry,
params=KafkaSchemaRegistryConfig(url=config.kafka_schema_registry())
params=KafkaSchemaRegistryConfig(url=kafka_config.kafka_schema_registry())
)

simple_producer_client = providers.Singleton(
KafkaSimpleProducer,
params=KafkaProducerConfig(bootstrap_servers=config.kafka_bootstrap_servers())
params=KafkaProducerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers())
)

# Sms Received serializer, deserializer, producer & consumer
Expand All @@ -46,15 +67,15 @@ class KafkaContainer(containers.DeclarativeContainer):

sms_received_protobuf_producer = providers.Singleton(
KafkaProtoProducer,
params=KafkaProducerConfig(bootstrap_servers=config.kafka_bootstrap_servers()),
params=KafkaProducerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers()),
serializer=sms_received_protobuf_serializer
)

sms_received_protobuf_consumer = providers.Singleton(
KafkaProtoConsumer,
params=KafkaConsumerConfig(bootstrap_servers=config.kafka_bootstrap_servers(),
topic=config.sms_received_topic(),
group_id=config.sms_received_group_id()),
params=KafkaConsumerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers(),
topic=kafka_config.sms_received_topic(),
group_id=kafka_config.sms_received_group_id()),
deserializer=sms_received_protobuf_deserializer
)

Expand All @@ -73,15 +94,15 @@ class KafkaContainer(containers.DeclarativeContainer):

sms_submitted_protobuf_producer = providers.Singleton(
KafkaProtoProducer,
params=KafkaProducerConfig(bootstrap_servers=config.kafka_bootstrap_servers()),
params=KafkaProducerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers()),
serializer=sms_submitted_protobuf_serializer
)

sms_submitted_protobuf_consumer = providers.Singleton(
KafkaProtoConsumer,
params=KafkaConsumerConfig(bootstrap_servers=config.kafka_bootstrap_servers(),
topic=config.sms_submitted_topic(),
group_id=config.sms_submitted_group_id()),
params=KafkaConsumerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers(),
topic=kafka_config.sms_submitted_topic(),
group_id=kafka_config.sms_submitted_group_id()),
deserializer=sms_submitted_protobuf_deserializer
)

Expand All @@ -100,15 +121,15 @@ class KafkaContainer(containers.DeclarativeContainer):

sms_sent_protobuf_producer = providers.Singleton(
KafkaProtoProducer,
params=KafkaProducerConfig(bootstrap_servers=config.kafka_bootstrap_servers()),
params=KafkaProducerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers()),
serializer=sms_sent_protobuf_serializer
)

sms_sent_protobuf_consumer = providers.Singleton(
KafkaProtoConsumer,
params=KafkaConsumerConfig(bootstrap_servers=config.kafka_bootstrap_servers(),
topic=config.sms_sent_topic(),
group_id=config.sms_sent_group_id()),
params=KafkaConsumerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers(),
topic=kafka_config.sms_sent_topic(),
group_id=kafka_config.sms_sent_group_id()),
deserializer=sms_sent_protobuf_deserializer
)

Expand All @@ -127,15 +148,15 @@ class KafkaContainer(containers.DeclarativeContainer):

sms_callback_received_protobuf_producer = providers.Singleton(
KafkaProtoProducer,
params=KafkaProducerConfig(bootstrap_servers=config.kafka_bootstrap_servers()),
params=KafkaProducerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers()),
serializer=sms_callback_received_protobuf_serializer
)

sms_callback_received_protobuf_consumer = providers.Singleton(
KafkaProtoConsumer,
params=KafkaConsumerConfig(bootstrap_servers=config.kafka_bootstrap_servers(),
topic=config.sms_callback_received_topic(),
group_id=config.sms_callback_received_group_id()),
params=KafkaConsumerConfig(bootstrap_servers=kafka_config.kafka_bootstrap_servers(),
topic=kafka_config.sms_callback_received_topic(),
group_id=kafka_config.sms_callback_received_group_id()),
deserializer=sms_callback_received_protobuf_deserializer
)

Expand Down
6 changes: 4 additions & 2 deletions app/config/di/gateways_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ class GatewaysContainer(containers.DeclarativeContainer):
"""

db_config = providers.Configuration(pydantic_settings=[DatabaseSettings()])
db_config.from_pydantic(DatabaseSettings())
# TODO: load settings from env directly
# db_config.from_pydantic(settings=DatabaseSettings())

twilio_sms_config = providers.Configuration(pydantic_settings=[TwilioSmsClientSettings()])
twilio_sms_config.from_pydantic(TwilioSmsClientSettings())
# TODO: load settings from env directly
# twilio_sms_config.from_pydantic(TwilioSmsClientSettings())

database_client = providers.Singleton(
DatabaseClient,
Expand Down
Loading

0 comments on commit a49e9aa

Please sign in to comment.