Skip to content
Ronak Nathani edited this page Aug 30, 2016 · 2 revisions

Table of Contents

  1. Introduction
  2. Requirements
  3. Dependencies
  4. Clone Secor
  5. Configure Secor
  6. Run Secor

Introduction

Secor is a horizontally scalable consumer service, written in Java, developed by Pinterest to consume and upload Kafka messages to S3, Google Cloud Storage or OpenStack Swift. For the purposes of the program, this tutorial will go through configuring Secor for S3.

Secor uses Hadoop’s jars to upload data to S3. It supports two file formats:

  • Sequence Files (default): Flat file containing binary key value pairs. Messages are stored as values in the key value pairs.
  • Delimited Text Files: A new line delimited raw text file.

Secor saves the last offset consumed for a topic/partiton on Zookeeper and ensures that messages are written exactly once to S3. It can discover new topics, new messages in an old topic/partition and can recover from failure by starting to consume from the last offset committed to S3. It also allows to whitelist and blacklist topics so that one can consume only a subset of topics. With Delimited Text Files, it gives the option to compress data using gzip format which can save space by an order of magnitude. Secor is very simple to set up and is worth the time spent. You can read more about its design here.

Secor can be setup on any machine and doesn’t have to sit on top of Kafka brokers. However, the machine you install Secor on should have hadoop jars or hadoop installed which would allow it to upload data to S3 (the hadoop server doesn’t need to be running though). Secor can also be installed on multiple machines and can be configured to consume messages from different topics to achieve parallelism. For simplicity, you can install Secor on one of your Kafka brokers (In that case, make sure you have hadoop installed on your Kafka cluster - you can stop the hadoop server to save memory if you are not using hadoop for any other purposes on that cluster).

Requirements

Zookeeper, Kafka, Hadoop

Dependencies

Secor jar is built with maven. If you don’t have maven installed, Install it on the node you would be setting up Secor on.

node:~$ sudo apt-get update
node:~$ sudo apt-get upgrade
node:~$ sudo apt-get install maven 

Clone Secor

We will clone Secor and build the jar with maven. We will install Secor in /usr/local

node:~$ cd /usr/local/
node:~$ sudo git clone https://github.com/pinterest/secor.git
node:~$ sudo chown -R ubuntu secor
node:~$ cd ./secor/
node:~$ mvn package
the jar is saved in /usr/local/secor/target
node:~$ mkdir bin node:~$ tar -zxvf ./target/secor-0.21-SNAPSHOT-bin.tar.gz -C bin

Configure Secor

Here we set up the AWS Keys, Kafka topics and output file format.

node:~$ cd /usr/local/secor/bin
node:~$ vim secor.common.properties
Change bolded italic text in /usr/local/secor/bin/secor.common.properties
… … … # See the License for the specific language governing permissions and # limitations under the License. ############ # MUST SET # ############ # Regular expression matching names of consumed topics. secor.kafka.topic_filter=.*
by default it will consume all topics. Use regular expressions to express topics you want to consume. E.g. if you want to consume topics named, my-topic1, my-topc2, etc., set this property to my-topic.* or set it to just one topic that you want to consume.
secor.kafka.topic_blacklist=
Use this property to blacklist topics. Use the same regular expression to describe topics you want to blacklist or just one topic that you want to blacklist.
# Choose what to fill according to the service you are using # in the choice option you can fill S3, GS, Swift or Azure cloud.service=S3 # AWS authentication credentials. # Leave empty if using IAM role-based authentication with s3a filesystem. aws.access.key= aws.secret.key= aws.role= … … … # installations with messages missing timestamp field message.timestamp.required=true # To enable compression, set this to a valid compression codec implementing # org.apache.hadoop.io.compress.CompressionCodec interface, such as # 'org.apache.hadoop.io.compress.GzipCodec'. secor.compression.codec=
if using Delimited Text File format, set this property to org.apache.hadoop.io.compress.GzipCodec if using Sequence File format, leave this blank. Gzip compression and sequence file format can’t be used together
# To set a custom file extension set this to a valid file suffix, such as # '.gz', '.part', etc. secor.file.extension=
Set this to .gz IF using gzip compression with Delimited Text File format as set in the property above. Leave blank otherwise
# The secor file reader/writer used to read/write the data, by default we write sequence files secor.file.reader.writer.factory=
the default value is com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory for Sequence File format you can change it to com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory for text file format
# Max message size in bytes to retrieve via KafkaClient. This is used by ProgressMonitor and PartitionFinalizer. # This should be set large enough to accept the max message size configured in your kafka broker # Default is 0.1 MB secor.max.message.size.bytes=10000000 … … …

Now, we will set the Kafka brokers, zookeeper servers, S3 buckets and upload policy.

node:~$ vim secor.prod.properties
Change bolded italic text in /usr/local/secor/bin/secor.prod.properties
… … … # See the License for the specific language governing permissions and # limitations under the License. include=secor.common.properties ############ # MUST SET # ############ # Name of one (random) Kafka broker host that is used to retrieve metadata. # TODO(pawel): use a list of nodes or even better, extract active brokers from zookeeper. kafka.seed.broker.host=<public DNS of a Kafka broker> # List of Kafka Zookeeper servers. zookeeper.quorum=
add a list of zookeeper server dns with the port. e.g. <public_dns_1>:2181,<public_dns_1>:2181,<public_dns_1>:2181 “No spaces between the commas or ‘=’ sign”
# Fill the section which fits your needs ############### # Using S3 # ############### # Name of the s3 bucket where log files are stored. secor.s3.bucket=<your s3 bucket where you want to store the data> … … … ################ # END MUST SET # ################
Below are the upload policies Secor uses to upload data to S3. When any of the condition, mentioned below, is met, Secor will start uploading the files to S3. These defaults are fine. However, if you are just testing to see if the setup is working, reduce the secor.max.file.size.bytes (to say 100) or secor.max.file.age.seconds (to say 10) to a small value just so that Secor triggers the upload soon and you can see your data in S3 to verify that the setup is working. For the purposes of your project, make sure these properties are set to a higher value (defaults are fine).
# Upload policies. # 200MB secor.max.file.size.bytes=200000000 # 1 hour # for hourly ingestion/finalization, set this property to smaller value, e.g. 1800 secor.max.file.age.seconds=3600

Run Secor

node:~$ cd /usr/local/secor/bin
node:~$ sudo java -ea -Dsecor_group=secor_backup -Dlog4j.configuration=log4j.prod.properties -Dconfig=secor.prod.backup.properties -cp secor-0.21-SNAPSHOT.jar:lib/* com.pinterest.secor.main.ConsumerMain

For ease, copy the above two commands in a file and save it as a shell script and then you can execute that shell script from anywhere.

Secor first saves the Kafka messages locally and when one of the conditions in the upload policy is met, it starts uploading the files to S3. Logs and messages are stored locally in the following directory

/mnt/secor_data