Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: NATS unstable connections #1581

Open
theobouwman opened this issue Jul 10, 2024 · 1 comment
Open

Bug: NATS unstable connections #1581

theobouwman opened this issue Jul 10, 2024 · 1 comment
Labels
bug Something isn't working NATS Issues related to `faststream.nats` module and NATS broker features

Comments

@theobouwman
Copy link

theobouwman commented Jul 10, 2024

Describe the bug
Our FastStream NATS implementation has seen a lot of error, since the start of using it:

Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer

How to reproduce
Our NATSService, used by our regular FastAPI app:

import asyncio
from enum import Enum
import logging
from opentelemetry import trace
from typing import List, TypeVar
from common.schemas.user import GetMeUserResponse
from faststream.nats import NatsBroker, StorageType, RetentionPolicy, JStream
from nats.js.api import KeyValueConfig
from nats.js.errors import KeyNotFoundError
from pydantic import AwareDatetime, RootModel
import sentry_sdk

from common.config import get_sabo_event_categories
from common.schemas.event import GetEventResponse
from common.schemas.feed import GetFeedItemResponse

from async_lru import alru_cache

T = TypeVar("T")

MOMO_WORKER_DEFAULT = JStream(
    name="momo-worker-default",
    retention=RetentionPolicy.WORK_QUEUE,
    max_age=60*60*1*3, # 3 hours
    declare=True,
    storage=StorageType.MEMORY,
    allow_direct=False,
)

class NATSBuckets(Enum):
    MOMO_ORGANISATION_EVENTS = "momo-organisation-events"
    MOMO_ORGANISATION_FEEDS = "momo-organisation-feeds"
    MOMO_ORGANISATION_MEMBERS = "momo-organisation-members"


class NATSService:
    def __init__(self, broker: NatsBroker):
        self.broker = broker


    async def init(self):
        """
        create KV stores for each bucket
        """
        try:
            tasks = []
            for bucket in NATSBuckets:
                tasks.append(asyncio.create_task(self.broker.stream.create_key_value(KeyValueConfig(
                    bucket=bucket.value,
                    ttl=60*60*24*7, # 1 week
                    storage=StorageType.MEMORY,
                ))))

            await asyncio.gather(*tasks)
        except Exception as e:
            logging.error(f"Error creating key value stores: {e}")


    @alru_cache(maxsize=16)
    async def _key_value(self, bucket):
        return await self.broker.stream.key_value(bucket)


    async def get_key_value(self, bucket: str, key: str):
        with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"get key value") as span:
            span.set_attribute("bucket", bucket)
            span.set_attribute("key", key)

            try:
                result = await (await self._key_value(bucket)).get(key)
                return result
            except KeyNotFoundError as e:
                return None
            except Exception as e:
                logging.error(f"Error getting key value: {e}")
                sentry_sdk.capture_exception(e)
                return None

     async def set_key_value(self, bucket: str, key: str, value: bytes):
        with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"set key value") as span:
            span.set_attribute("bucket", bucket)
            span.set_attribute("key", key)

            try:
                await (await self._key_value(bucket)).put(key, value)
            except Exception as e:
                logging.error(f"Error setting key value: {e}")
                sentry_sdk.capture_exception(e)

And here is is our Dishka setup for the NATS Worker service which will handle NATS messages, the broker is used by the NATSService and the router by the FastAPI FastStream implementation:

class NATSProvider(Provider):
    def __init__(self, broker: NatsBroker):
        super().__init__(scope=Scope.APP)
        self.broker = broker

    @provide(scope=Scope.APP)
    async def nats_service(self) -> NATSService:
        client = await self.broker.connect()

        nats_service = NATSService(self.broker)
        await nats_service.init()

        return nats_service




def _make_nats_broker() -> NatsBroker:
    async def error_cb(e):
        logging.error(f"Error in NATS: {e}")
        sentry_sdk.capture_exception(e)

    tracer_provider = trace.get_tracer_provider()

    _broker = NatsBroker(
        get_config().NATS_SERVER_URL,
        user_credentials="./nats-production-momo-api.creds",
        error_cb=error_cb,
        allow_reconnect=True,
        middlewares=(
            NatsTelemetryMiddleware(
                tracer_provider=tracer_provider,
            ),
        )
    )

    _router = NatsRouter(
        get_config().NATS_SERVER_URL,
        user_credentials="./nats-production-momo-api.creds",
        error_cb=error_cb,
        middlewares=(
            NatsTelemetryMiddleware(
                tracer_provider=tracer_provider,
            ),
        )
    )

    return _broker, _router

def make_container() -> Tuple[AsyncContainer, NatsBroker, NatsRouter]:
    nats_broker, nats_router = _make_nats_broker()
    container = make_async_container(
        DBProvider(),
        FastStreamProvider(),
        NATSProvider(broker=nats_broker),
        ServiceProvider(),
    )

    return container, nats_broker, nats_router

And lastly the Worker App:

import asyncio
import random
from typing import Annotated
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from faststream.nats import DeliverPolicy, PullSub
from dishka.integrations.faststream import inject, FromDishka
from api.auth import firebase_initialization
from common.config import get_config
from common.exceptions.handlers import init_exception_handlers
from common.middleware import init_middleware
from common.observability import init_observability
from common.services.nats_service import MOMO_WORKER_DEFAULT
from common.services.user_service import UserService
from di import make_container
from dishka.integrations.faststream import setup_dishka as setup_faststream_ioc
from dishka.integrations.fastapi import setup_dishka as setup_fastapi_ioc

from common.schemas.worker import UserEmailSummarySendPayload

#
# DI container
#
container, _, nats_router = make_container()

#
# FastStream setup
#
setup_faststream_ioc(
    container,
    nats_router,
    finalize_container=False,
)

#
# FastAPI setup
#
_fastapi_app = FastAPI(
    title="momo-worker",
    lifespan=nats_router.lifespan_context,
    docs_url=get_config().swagger_url(),
    redoc_url=get_config().redoc_url(),
    debug=get_config().DEBUG,
    default_response_class=ORJSONResponse,
)

setup_fastapi_ioc(container, _fastapi_app)
init_observability("momo-worker", _fastapi_app)
init_exception_handlers(_fastapi_app)
init_middleware(_fastapi_app)
firebase_initialization()

# NOTE: subject: ENTITY.DOMAIN.ACTION
# e.g. user.email.summary.send
# e.g. organisation.feed.reset
# e.g. organisation.event.created
# e.g. organisation.event.deleted
# e.g. organisation.event.started
# e.g. organisation.event.ended
# e.g. organisation.member.added

#
# Handlers
#

@nats_router.subscriber(
    "scheduled.bimonthly.user.email.summary.send",
    durable="bimonthly-user-email-summary-send",
    stream=MOMO_WORKER_DEFAULT,
    max_workers=10,
    deliver_policy=DeliverPolicy.ALL,
    pull_sub=PullSub(batch_size=10),
)
@inject
async def handler(user_service: Annotated[UserService, FromDishka()]):
    users = await user_service.get_all_users()

    tasks = []

    for user in users:
        tasks.append(asyncio.create_task(nats_router.broker.publish(UserEmailSummarySendPayload(user_id=user.uid), "user.email.summary.send")))

    await asyncio.gather(*tasks)


@nats_router.subscriber(
    "user.email.summary.send",
    durable="user-email-summary-send",
    stream=MOMO_WORKER_DEFAULT,
    max_workers=10,
    deliver_policy=DeliverPolicy.ALL,
    pull_sub=PullSub(batch_size=10)
)
@inject
async def handler(msg: UserEmailSummarySendPayload, user_service: Annotated[UserService, FromDishka()]):
    print(f"Sending email to {msg.user_id}")
    try:
        user = await user_service.get_user(msg.user_id)
    except Exception as e:
        pass
    await asyncio.sleep(random.randint(1, 5))
    return msg


#
# Add routers
#
_fastapi_app.include_router(nats_router)

app = _fastapi_app

Expected behavior
We expect our NATSService and FastStream app to have constant connection with the NATS server (hosted on Scaleway), without the errors, and to automatically reconnect which is default behaviour.

Observed behavior
We observe connection errors, so does this suggest that the broker is not reconnecting?:

Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer

Environment

Running FastStream 0.5.13 with CPython 3.11.6 on Darwin

Additional context
We have contacted Scaleway support.
They stated that resource limits are:

The limits are :
300Mb totals in stream/Kv
50 streams/Kv
50 consumers per stream

Which is more than enough, and we are not exceeding this.

They also stated that we must be sure to handle automatic reconnection. But to our understanding this is the default behaviour.

so are we doing something totally wrong with our implementation?
We are not always getting these errors. They seem to appear irregular.

@theobouwman theobouwman added the bug Something isn't working label Jul 10, 2024
@Lancetnik Lancetnik added the NATS Issues related to `faststream.nats` module and NATS broker features label Jul 10, 2024
@Lancetnik
Copy link
Collaborator

nats-py provides us with a connection recovery functional, but seems like recovery doesn't work if Authentication was failed. Also, it looks like Authentication fails on connection timeout too...
I need to dig into nats-py to find the reason

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working NATS Issues related to `faststream.nats` module and NATS broker features
Projects
None yet
Development

No branches or pull requests

2 participants