Skip to content

Rinsvent/requester

Repository files navigation

Requester

Сервис для выполнения HTTP-запросов с поддержкой rate limiting, retry механизмов, кеширования и горизонтального масштабирования.

Use cases

  1. Proxy — размещение на серверах без блокировок для проксирования запросов к заблокированным или недоступным эндпоинтам.
  2. Кеширование ответов — снижение нагрузки на свои и внешние интеграции, повторное использование ответов без повторных запросов.
  3. Лимитирование запросов — защита от 429 и потери квот внешних API через настраиваемые rate limiters.
  4. Retry до победного — повторные попытки с экспоненциальной задержкой для критичных операций (например, webhook) вплоть до попыток через месяцы и больше.
  5. Отложенные запросы — планирование выполнения на любой период вплоть до лет через поле scheduled.
  6. Wake up — отправка в Kafka/API сообщений, которые запускают процессы в нужное время (планировщик, будильники для микросервисов).
  7. Гарантия порядка — ordered-партиции (meta.partition с !) обеспечивают строгую последовательность обработки сообщений без перемешивания.
  8. Неблокирующие партиции — в Kafka один ключ блокирует всю партицию; здесь разные ключи обрабатываются параллельно, один «зависший» не блокирует остальные.
  9. Высокий RPS — 2k+ запросов в секунду на типичном железе за счёт пула воркеров и оптимизированного Kafka fetch.

Структура проекта

requester/
├── main.go            # Точка входа приложения (CLI)
├── internal/          # Внутренние пакеты
│   ├── cache/        # Кеширование ответов
│   ├── config/      # Конфигурация
│   ├── handler/      # HTTP handlers
│   ├── hub/          # Hub для синхронных запросов
│   ├── kafka/        # Kafka producer/consumer
│   ├── models/       # Модели данных
│   ├── pgqueue/      # PostgreSQL очередь (outbox pattern)
│   ├── processor/    # Обработчик задач
│   ├── ratelimiter/  # Rate limiting стратегии
│   ├── redis/        # Redis клиент
│   ├── redisqueue/   # Redis очередь (sorted set)
│   ├── retry/        # Retry механизм
│   └── utils/        # Утилиты
├── docker-compose.yaml
├── http/               # JetBrains .http файлы для тестирования API
│   ├── requester.http  # Basic, limiter, retry, cache, partition, combined
│   └── http-client.env.example
└── Makefile

Запуск

# Запуск инфраструктуры (Kafka, Redis, PostgreSQL)
make docker-up

# Сборка
make build

# Запуск сервера
./bin/requester server
# или
make run

# Просмотр доступных команд
./bin/requester --help

Архитектура

Поток данных

HTTP API ──► Kafka (in) ──► Router ──┬──► Redis Queue ──► Workers ──► Kafka (out)
                                     │        
                                     │        
                                     └──► PostgreSQL ──► PG Mover ──► Kafka (in) ──┘
  1. HTTP API принимает запросы и отправляет их в Kafka топик in.
  2. Kafka in consumer (один воркер на весь кластер через consumer group) читает задачи и маршрутизирует:
    • Если задача готова к выполнению или запланирована в пределах 1 секунды → отправляется в Redis Queue.
    • Если задача запланирована более чем на 1 секунду в будущее → отправляется в PostgreSQL (outbox).
  3. PG → Kafka Mover периодически опрашивает PostgreSQL и перекладывает готовые задачи (scheduled ≤ now) в Kafka топик in, откуда они штатно попадают в Redis Queue.
  4. Workers читают задачи из Redis Queue и выполняют HTTP-запросы через processor.ProcessTask.
  5. Результат отправляется в Kafka топик out.

Retry (повторные попытки)

При получении rate limit ошибки задача маршрутизируется обратно:

  • Если время следующей попытки ≤ 1 секунды → задача отправляется в Redis Queue с отложенным scheduled.
  • Если время следующей попытки > 1 секунды → задача отправляется в PostgreSQL.
  • ordered сообщения (партиция начинается с символа !) всегда возвращаются в redis с ttl блокировкой, чтобы гарантировать порядок

Конкурентная обработка

  • Redis Queue использует Lua-скрипты для атомарного получения задач. Параллельные воркеры не берут одну и ту же задачу.
  • PostgreSQL Queue использует SELECT FOR UPDATE SKIP LOCKED (outbox pattern) для безопасной работы нескольких инстансов.
  • Воркеры забирают только задачи с scheduled ≤ now, задачи в будущем не трогаются.

Режимы работы

  • Асинхронный (по умолчанию) — запросы обрабатываются через очереди, результаты возвращаются через Kafka топик out.
  • Синхронный (sync: true) — запрос выполняется, результат возвращается в Kafka топик out и в HTTP ответе. Подходит для случаев, когда нужен немедленный ответ.

Kafka

В сервисе используются 2 топика:

  1. in — входящие задачи. Топик должен иметь 1 партицию для гарантии обработки одним воркером.
  2. out — ответы для инициатора задачи.

Wake up (отложенная доставка)

Сервис поддерживает сценарий «wake up» — доставку сообщений с задержкой для потребления другими сервисами.

Возможности:

  1. Отложенная обработка — поле scheduled в meta позволяет запланировать выполнение задачи на будущее. Задачи с scheduled > 1s попадают в PostgreSQL outbox и возвращаются в Kafka in в момент наступления времени выполнения.

  2. Proxy pass-through — при uri: "" HTTP-запрос не выполняется: body и headers из запроса пробрасываются в ответ без изменений (status 200). Удобно для передачи произвольных данных через сервис без реального HTTP-вызова.

  3. proxyData — поле в meta (запрос) и meta (ответ), которое передаётся без изменений. Позволяет прикреплять пользовательские данные (идентификаторы, маршрутизацию и т.п.) для последующей обработки потребителями.

Типичный сценарий:

  • Сервис A отправляет задачу с uri: "", scheduled и proxyData (например, идентификатор целевого сервиса).
  • Requester откладывает выполнение и в нужный момент «пробрасывает» body в топик out.
  • Другие сервисы читают из out и обрабатывают сообщения.

Масштабирование потребителей:

Если потребителей out много, один топик может не подходить — разные сервисы конкурируют за одни и те же сообщения. Для fan-out из out в несколько очередей/топиков можно использовать Redpanda Connect Broker: он читает из out и рассылает сообщения в разные выходы (Kafka топики, очереди и т.д.) по паттерну fan_out.

API

Реализован endpoint POST /request

Асинхронный режим (по умолчанию, sync: false или не указан)

Request Body:

{
  "request": {
    "uri": "http://example.com/api",
    "method": "POST",
    "body": "{\"key\": \"value\"}",
    "headers": {
      "Content-Type": "application/json"
    }
  },
  "meta": {
    "requestId": "req-123",
    "traceId": "trace-456",
    "proxyData": {"target": "service-a"},
    "limiter": {
      "key": "limiter_key",
      "algorithm": "sliding-window-counter",
      "rates": [
        {"duration": "3s", "value": 3},
        {"duration": "1h", "value": 30}
      ]
    },
    "retry": {
      "max": 5,
      "delay": "1s",
      "maxDelay": "1h",
      "multiplier": 60,
      "jitter": 0.1
    },
    "cache": {
      "key": "cache_key",
      "ttl": "10s"
    }
  }
}

Response (HTTP 202 Accepted):

{
  "status": "accepted",
  "requestId": "req-123"
}

Синхронный режим (sync: true)

Request Body:

{
  "request": {
    "uri": "http://example.com/api",
    "method": "GET"
  },
  "meta": {
    "requestId": "req-123",
    "traceId": "trace-456",
    "sync": true,
    "cache": {
      "key": "cache_key",
      "ttl": "10s"
    },
    "proxyData": {"target": "service-a"}
  }
}

Response (HTTP 200 OK):

{
  "response": {
    "status": 200,
    "body": "response body",
    "headers": {
      "Content-Type": "application/json"
    }
  },
  "meta": {
    "requestId": "req-123",
    "traceId": "trace-456",
    "tryCount": 1,
    "time": "150ms",
    "cached": false,
    "waitTime": "0s",
    "proxyData": {"target": "service-a"}
  }
}

Особенности синхронного режима:

  • Запрос выполняется в том же инстансе, который принял HTTP запрос
  • При превышении rate limit возвращается HTTP 429 с заголовком Retry-After
  • Retry механизм не применяется (при ошибке возвращается сразу)

Rate limiter

Используется Redis для подсчета запросов.

Доступные стратегии:

  • Token Bucket (token-bucket)
  • Leaky Bucket (leaky-bucket)
  • Fixed Window Counter (fixed-window-counter)
  • Sliding Window Log (sliding-window-log)
  • Sliding Window Counter (sliding-window-counter)

Cache

Кеширование ответов в Redis. При запросе проверяется наличие кеша и его актуальность.

Performance

Количество воркеров настраивается через конфигурацию (MAX_WORKERS). Воркеры параллельно читают из Redis Queue.

Scaling

При добавлении новых инстансов:

  • Kafka in топик читается одним воркером (через consumer group).
  • Redis Queue и PostgreSQL Queue поддерживают конкурентный доступ — каждый инстанс безопасно обрабатывает свою долю задач.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors