Skip to content
/ engage Public

Streaming change data capture from SQL Server to MongoDB, Redis and Hadoop with Debezium and Spark Streaming

Notifications You must be signed in to change notification settings

himewel/engage

Repository files navigation

Engage Challenge

Docker SQL Server Apache Spark MongoDB Redis

Architecture

This project implements a NoSQL strategy to optimize queries executed on a SQL Server. To do it, the presented architecture was built. A rest API is available as a data source to create our SQL Server replication. The NoSQL databases are populated with data extracted from the SQL Server with Debezium and structured with Spark. In the end, MongoDB is used as main database for the query executions and Redis serve as cache to store the main results. In short:

  • Debezium: kafka based solution for CDC monitoring from SQL Server;
  • Hadoop: long term data solution, used to append data in parquet format;
  • MongoDB: main NoSQL storage for the solution;
  • Redis: in memory database used to cache the main query results from MongoDB;
  • Spark: used to stream data from Debezium, transform and ingest data in the leafs: Hadoop, MongoDB and Redis;
  • SQL Server: data source where transactional data is stored.

Data flow

A spark streaming job is triggered to each topic from SQL Server CDC and extract the data to be inserted in hdfs and MongoDB. In this layer, the data inserts occurs as the following: hdfs only append the data extracted in hadoop partitioned by extraction time; as MongoDB replace documents with same _id, all the rows from SQL Server are upserted.

Dataflow

The second layer runs only when the first layer insert new data and trigger a Kafka event (spark.answers). This layer is responsible to calculate the rank aggregations in MongoDB. To do it, some auxiliar collections are merged running lookup and group aggregations. The third layer also runs oriented by Kafka events (mongo.scores). In this layer, the aggregations created in the second layer are collected and transformed to be available in Redis.

To monitor the query execution times to generate the ranking queries, you can run make profile. The profiling script runs 10.000 interactions in each database SQL Server, MongoDB and Redis requesting userScores data. The output shows the average, standard deviation and gain compared to the SQL Server execution time for each database in microseconds:

Database Type Mean Standard deviation Gain
SQL Server SQL 5895.543 µs 1752.899 µs -
MongoDB NoSQL (documents) 2832.174 µs 931.078 µs 2.08x
Redis NoSQL (key-value) 895.669 µs 336.608 µs 3.16x

Furthermore, a latency of ~15s is added by the streaming processment running all the containers in a local environment. The sync jobs of aggregations and Redis takes a maximum of 2s to be triggered. So, the Redis cache should be refreshed in 19s and MongoDB collections at 17s. All the profiling tests and execution time measurements were collected in a Intel Core i3 with 16GB RAM memory installed with Elementaty OS.

Kafka topics

In principle, our SQL Server replica is built as the following schema.

Schema

So, Debezium creates a topic to stream each one of the tables. A few other topics are created to held the communication between streaming jobs, like mongo.scores: it is used to trigger the sync between MongoDB and Redis aggregations. The following table presents this topics:

Topics Source Description
mssql.engagedb.dbo.answers mssql.engagedb.dbo.activities mssql.engagedb.dbo.rounds mssql.engagedb.dbo.groups mssql.engagedb.dbo.users Debezium Each event describes an operations of insert, update or delete in the database engagedb and schema dbo.
spark.answers Spark Show the number of lines processed in a micro batch by Spark Streaming and ingested in MongoDB. When a event reaches this topic, a job is triggered to sync the data in MongoDB collections with ranking aggregations.
mongo.scores Kafka Python client Presents the execution time to update the ranking aggregations in MongoDB. A job to sync the aggregations between MongoDB and Redis is triggered when a event reaches this topic.
redis.cache Kafka Python client Presents the execution time to update the ranking aggregations copied to Redis.

How to start

All the code developed is structured in docker containers and docker compose files. As the containers are divided in multiple compose files, the Makefile targets should help to manipulate it.

First of all, make init creates the necessary docker requirements. So, to start the containers, make init should be run first. Implicitly it is already called before storage, debezium, streaming and all targets. In example, to start only the storage containers such as Hadoop, SQL Server, Redis and MongoDB:

make storage

To start only the Debezium services such as Kafka, Zookeeper and Debezium Connect:

make debezium

To start only the Spark jobs such as Hadoop, MongoDB and Redis ingestions:

make streaming

To start all the containers:

make all

# to stop them
make stop

User interface endpoints

About

Streaming change data capture from SQL Server to MongoDB, Redis and Hadoop with Debezium and Spark Streaming

Topics

Resources

Stars

Watchers

Forks