|
| 1 | +defmodule ElixirPopularity.RMQPublisher do |
| 2 | + @behaviour GenRMQ.Publisher |
| 3 | + |
| 4 | + @rmq_uri "amqp://rabbitmq:rabbitmq@localhost:5672" |
| 5 | + @hn_exchange "hn_analytics" |
| 6 | + @hn_item_ids_queue "item_ids" |
| 7 | + @hn_bulk_item_data_queue "bulk_item_data" |
| 8 | + @publish_options [persistent: false] |
| 9 | + |
| 10 | + def init do |
| 11 | + create_rmq_resources() |
| 12 | + |
| 13 | + [ |
| 14 | + uri: @rmq_uri, |
| 15 | + exchange: @hn_exchange |
| 16 | + ] |
| 17 | + end |
| 18 | + |
| 19 | + def start_link do |
| 20 | + GenRMQ.Publisher.start_link(__MODULE__, name: __MODULE__) |
| 21 | + end |
| 22 | + |
| 23 | + def hn_id_queue_size do |
| 24 | + GenRMQ.Publisher.message_count(__MODULE__, @hn_item_ids_queue) |
| 25 | + end |
| 26 | + |
| 27 | + def publish_hn_id(hn_id) do |
| 28 | + GenRMQ.Publisher.publish(__MODULE__, hn_id, @hn_item_ids_queue, @publish_options) |
| 29 | + end |
| 30 | + |
| 31 | + def publish_hn_items(items) do |
| 32 | + GenRMQ.Publisher.publish(__MODULE__, items, @hn_bulk_item_data_queue, @publish_options) |
| 33 | + end |
| 34 | + |
| 35 | + def item_id_queue_name, do: @hn_item_ids_queue |
| 36 | + |
| 37 | + def bulk_item_data_queue_name, do: @hn_bulk_item_data_queue |
| 38 | + |
| 39 | + defp create_rmq_resources do |
| 40 | + # Setup RabbitMQ connection |
| 41 | + {:ok, connection} = AMQP.Connection.open(@rmq_uri) |
| 42 | + {:ok, channel} = AMQP.Channel.open(connection) |
| 43 | + |
| 44 | + # Create exchange |
| 45 | + AMQP.Exchange.declare(channel, @hn_exchange, :topic, durable: true) |
| 46 | + |
| 47 | + # Create queues |
| 48 | + AMQP.Queue.declare(channel, @hn_item_ids_queue, durable: true) |
| 49 | + AMQP.Queue.declare(channel, @hn_bulk_item_data_queue, durable: true) |
| 50 | + |
| 51 | + # Bind queues to exchange |
| 52 | + AMQP.Queue.bind(channel, @hn_item_ids_queue, @hn_exchange, routing_key: @hn_item_ids_queue) |
| 53 | + AMQP.Queue.bind(channel, @hn_bulk_item_data_queue, @hn_exchange, routing_key: @hn_bulk_item_data_queue) |
| 54 | + |
| 55 | + # Close the channel as it is no longer needed |
| 56 | + # GenRMQ will manage its own channel |
| 57 | + AMQP.Channel.close(channel) |
| 58 | + end |
| 59 | +end |
0 commit comments