Data Pipeline for distributed collection of Rails applications
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.


Emit a versioned stream of changes to a pub/sub queue when ActiveRecord models are saved. This gem supports Redis, AWS (SNS/SQS), and IronMQ publishing targets. The Redis backend supports a forwarding system to a cloud MQ like IronMQ. Messages are encrypted in transit using AES symmetric encryption and a shared secret.


Many systems evolve into a inter-related collection of Rails applications as they grow out of their initial monolithic design. This is often coincident with independent teams taking responsibility for different aspects of the business and different applications. In our case this has manifested itself as one team that is responsible for the ecommerce platform (web, mobile etc) and another team that works on data warehousing and personalization. The data team was using a read-only replica of the platform team's database for data warehousing, which tightly coupled us together when the platform team wanted to make schema changes for performance or development velocity reasons.

This project allows us to offer a versioned API that we can maintain backwards compatibility for while changing the underlying code and schema as we see fit. Consumers of data will upgrade to new schema versions as they need to and when they are able.


Install with bundler

gem 'rails-pipeline'

You should create a private repository containing your protocolbuffers schemas and also depend on that, or use git subtree to bring it into your Rails apps.

For any models that you wish to publish changes, just include the appropriate pipeline emitter

include RailsPipeline::RedisEmitter

Each queue backend has different methods of consuming messages as a subscriber, but for IronMQ there is an implementation of a webhook subscriber (details below).

The following environment variables control pipeline operations

# If set, do not emit or process incoming messages

# If set, do not emit pipeline messages

# If set, do not process incoming messages as a subscriber, just drop them

The following environment variable sets the shared secret for pipeline encryption


You can pass in a logger for RailsPipeline in an initializer e.g.

RailsPipeline.logger = Rails.logger
RailsPipeline.logger.level = Log4r::DEBUG



Redis EmitterOnly as a forwarding intermediary
Redis SubscriberOnly forwarder

The implementation for Redis assumes you want to use it as a local forwarding queue to a more scalable service such as AWS or IronMQ. Thus all messages are pushed onto a single Redis queue and include the name of the target topic/queue. We have included a bouncer process that will read from the Redis queue (in parallel if need be) and forward on to IronMQ. Adding an AWS forwarder would be trivial.

It may be desirable to write a full pub/sub emitter for Redis (rather than just a forwarder.)

Redis Config

The following environment variables are checked for Redis urls (default: localhost:6379):


Alternatively you could pass in an instance of the Redis client in an initializer:

RailsPipeline::RedisPublisher.redis = MyRedisFactory.get

The key name of the redis queue should be set in an initializer, e.g.

RailsPipeline::RedisPublisher.namespace = "rails-pipeline-spec"


IronMQ EmitterYES
Redis-to-IronMQ ForwarderYES
IronMQ SubscriberYES


Each model/version emits ProtocolBuffer messages to a specific IronMQ queue. If that queue is set as a "push" queue. Subscribers can then add themselves as http webhook endpoints for the push queue and messages will be delivered to them.


There is a subscriber implementation for IronMQ as an http endpoint.

Mount the Sinatra endpoint in your app's routes.rb

match "/ironmq" => RailsPipeline::IronmqSubscriber, :anchor => false

Register your own models as recipients of different pipeline message types and versions (in an Rails initializer):

RailsPipeline::Subscriber.register(SomeModel_2_0, MyModel)

You will need to write a MyModel#from_pipeline_2_0() method. You can also register any Proc as a processor for messages.

Add your URL as a subscriber to the push queues you care about using the supplied 'pipeline' command

pipeline ironmq-subscribe-endpoint some_models

You may find ngrok helpful for developing and debugging. Config

The 'iron_mq' gem picks up the following environment variables


AWS (Simple Notification Service)

SNS EmitterYES
Redis-to-SNS ForwarderNO, but easy to add.
SQS Polling SubscriberNO
SNS Webhook SubscriberNO

We include a proof-of-concept AWS emitter, written with the idea in mind to use SQS as pub/sub queues and polling subscribers. It would also be possible to publish to SNS and have multiple subscribers receive http webhook messages as in IronMQ.

There are some commands in the 'pipeline' script to configure SNS/SQS:

Create SNS topics to publish to:

pipeline sns-create-topic TABLE_NAME --env ENV --version VERSION

Create and SQS queue and subscribe it to a TOPIC (one per subscribing rails app)

pipeline sqs-subscribe-app APP TABLE_NAME[,TABLE_NAME_2,...] --env ENV --version VERSION

AWS Config

The AWS gem picks up the following environment variables


In addition, we use the numerical 'owner id' for your account which should be set as


Protocol Buffers

To build the test protocol buffers ruby files in rails-pipeline:

brew install protobuf

We have created a private repository gem for our protocol buffers definitions. This is laid out like


Proto files look like this

	package Harrys.Pipeline;

	message Order__1__0 {
	  required int32 id = 1;
      required double created_at = 2;
	  required double updated_at = 3;

We then have a Makefile almost identical to the one in this gem to build our .pb.rb files:

RUBY_PROTOC=bundle exec ruby-protoc
PROTOS=$(wildcard $(GENDIR)/*.proto)

all: $(PBS)

%.pb.rb: %.proto
        $(RUBY_PROTOC) $<

        rm -f $(PBS)