Skip to content

huseyinezdemir/rabbitmq

 
 

Repository files navigation

rabbitmq Wrapper

RabbitMq Wrapper is the a client API for RabbitMQ.

  • A wrapper over amqp exchanges and queues.
  • In memory retries for consuming messages when an error occured
  • CorrelationId and MessageId structure
  • Exchange Types With Direct, Fanout, Topic, ConsistentHashing
  • Retry policy (immediately , interval)
  • Multiple consumers In a single process
  • Create goroutines and consume messages asynchronously
  • Disable consume messages asynchronously if you want
  • Retry to connect another node When RabbitMq Node is Down or Broken Connection
  • Add stack trace on the message header if the error occurred when the message is consumed
  • Some extra features while publishing message (will be added)

To connect to a RabbitMQ broker...

	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost")

To connect to a RabbitMQ broker with retry policy

  • Consumer retries two times immediately if an error occured

     var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                                               rabbit.RetryCount(2,time.Duration(0)))
    
  • Create goroutines and consume messages asynchronously using PrefetchCount Prefix. Create as number of PrefetchCount as goroutines .

    	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                    		rabbit.PrefetchCount(3))
    

To send a message

    // Added

To consume a message

    onConsumed := func(message rabbit.Message) error {
    
    		var consumeMessage PersonV1
    		var err= json.Unmarshal(message.Payload, &consumeMessage)
    		if err != nil {
    			return err
    		}
    		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
    		return nil
    	}
    

    rabbitClient.AddConsumer("In.Person").
    SubscriberExchange("RoutinKey.*",rabbit.Direct ,"Person").
    HandleConsumer(onConsumed)

To Consume multiple messages

	onConsumed := func(message rabbit.Message) error {

		var consumeMessage PersonV1
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}

	onConsumed2 := func(message rabbit.Message) error {

		var consumeMessage PersonV4
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}
	rabbitClient.AddConsumer("In.Person3").
            SubscriberExchange("",rabbit.Fanout ,"ExchangeNamePerson").
            HandleConsumer(onConsumed)
            
    rabbitClient.AddConsumer("In.Person").
             SubscriberExchange("Person.*",rabbit.Direct ,"PersonV1").
             HandleConsumer(onConsumed2)
             

	rabbitClient.RunConsumers()

To Consume multiple exchange

    rabbitClient.AddConsumer("In.Lines").
    		SubscriberExchange("1", rabbit.ConsistentHashing,"OrderLineAdded").
    		SubscriberExchange("1", rabbit.ConsistentHashing,OrderLineCancelled).
    		WithSingleGoroutine(true).
    		HandleConsumer(onConsumed2)

About

RabbitMq Wrapper

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 100.0%