Skip to content

Commit

Permalink
remove support for legacy input/output formats
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeterson committed Jul 9, 2016
1 parent aa84b6e commit 391d19e
Show file tree
Hide file tree
Showing 24 changed files with 36 additions and 1,374 deletions.
8 changes: 0 additions & 8 deletions doc/detailed_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,6 @@ bytes to write to discard report available from Dory's web interface. The
default value is 256.
* `--topic_autocreate`: Enable automatic topic creation. For this to work, the
brokers must be configured with `auto.create.topics.enable=true`.
* `--omit_timestamp`: Do not use this option, since it will soon be removed.
Its purpose is to provide compatibility with legacy infrastructure at if(we).
* `--use_old_input_format`: Do not use this option, since it will soon be
removed. Its purpose is to provide compatibility with legacy infrastructure at
if(we).
* `--use_old_output_format`: Do not use this option, since it will soon be
removed. Its purpose is to provide compatibility with legacy infrastructure at
if(we).

Now that you are familiar with all of Dory's configuration options, you may
find information on [troubleshooting](troubleshooting.md) helpful.
Expand Down
2 changes: 0 additions & 2 deletions src/dory/client/to_dory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,12 @@
#include <dory/client/tcp_sender.h>
#include <dory/client/unix_dg_sender.h>
#include <dory/client/unix_stream_sender.h>
#include <dory/input_dg/old_v0_input_dg_writer.h>
#include <dory/util/arg_parse_error.h>
#include <tclap/CmdLine.h>

using namespace Base;
using namespace Dory;
using namespace Dory::Client;
using namespace Dory::InputDg;
using namespace Dory::Util;

struct TConfig {
Expand Down
39 changes: 1 addition & 38 deletions src/dory/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,6 @@ static void ParseArgs(int argc, char *argv[], TConfig &config,
"metadata from Kafka in response to an error.", false,
config.MinPauseDelay, "MIN_DELAY_MS");
cmd.add(arg_min_pause_delay);
SwitchArg arg_omit_timestamp("", "omit_timestamp", "Omit timestamps from "
"messages (applicable only when using legacy input format). Do not "
"use this option, since it will soon be removed. Its purpose is to "
"provide compatibility with legacy infrastructure at if(we).", cmd,
config.OmitTimestamp);
ValueArg<decltype(config.DiscardReportInterval)>
arg_discard_report_interval("", "discard_report_interval",
"Discard reporting interval in seconds.", false,
Expand Down Expand Up @@ -336,16 +331,6 @@ static void ParseArgs(int argc, char *argv[], TConfig &config,
SwitchArg arg_topic_autocreate("", "topic_autocreate", "Enable support "
"for automatic topic creation. The Kafka brokers must also be "
"configured to support this.", cmd, config.TopicAutocreate);
SwitchArg arg_use_old_input_format("", "use_old_input_format", "Expect "
"input UNIX datagrams to adhere to old format. Do not use this "
"option, since it will soon be removed. Its purpose is to provide "
"compatibility with legacy infrastructure at if(we).", cmd,
config.UseOldInputFormat);
SwitchArg arg_use_old_output_format("", "use_old_output_format", "Send "
"messages to Kafka using old format. Do not use this option, since "
"it will soon be removed. Its purpose is to provide compatibility "
"with legacy infrastructure at if(we).", cmd,
config.UseOldOutputFormat);
cmd.parse(argc, &arg_vec[0]);
config.ConfigPath = arg_config_path.getValue();
config.LogLevel = StringToLogLevel(arg_log_level.getValue());
Expand Down Expand Up @@ -399,7 +384,6 @@ static void ParseArgs(int argc, char *argv[], TConfig &config,
config.PauseRateLimitMaxDouble =
arg_pause_rate_limit_max_double.getValue();
config.MinPauseDelay = arg_min_pause_delay.getValue();
config.OmitTimestamp = arg_omit_timestamp.getValue();
config.DiscardReportInterval = arg_discard_report_interval.getValue();
config.NoLogDiscard = arg_no_log_discard.getValue();
config.DebugDir = arg_debug_dir.getValue();
Expand All @@ -416,8 +400,6 @@ static void ParseArgs(int argc, char *argv[], TConfig &config,
config.DiscardReportBadMsgPrefixSize =
arg_discard_report_bad_msg_prefix_size.getValue();
config.TopicAutocreate = arg_topic_autocreate.getValue();
config.UseOldInputFormat = arg_use_old_input_format.getValue();
config.UseOldOutputFormat = arg_use_old_output_format.getValue();

if (!arg_receive_socket_name.isSet() &&
!arg_receive_stream_socket_name.isSet() && !arg_input_port.isSet()) {
Expand Down Expand Up @@ -474,7 +456,6 @@ TConfig::TConfig(int argc, char *argv[], bool allow_input_bind_ephemeral)
PauseRateLimitInitial(5000),
PauseRateLimitMaxDouble(4),
MinPauseDelay(5000),
OmitTimestamp(false),
DiscardReportInterval(600),
NoLogDiscard(false),
DebugDir("/home/dory/debug"),
Expand All @@ -485,9 +466,7 @@ TConfig::TConfig(int argc, char *argv[], bool allow_input_bind_ephemeral)
DiscardLogMaxArchiveSize(8 * 1024),
DiscardLogBadMsgPrefixSize(256),
DiscardReportBadMsgPrefixSize(256),
TopicAutocreate(false),
UseOldInputFormat(false),
UseOldOutputFormat(false) {
TopicAutocreate(false) {
ParseArgs(argc, argv, *this, allow_input_bind_ephemeral);
}

Expand Down Expand Up @@ -588,20 +567,8 @@ void Dory::LogConfig(const TConfig &config) {
static_cast<unsigned long>(config.PauseRateLimitMaxDouble));
syslog(LOG_NOTICE, "Minimum pause delay %lu milliseconds",
static_cast<unsigned long>(config.MinPauseDelay));

if (config.UseOldInputFormat) {
syslog(LOG_NOTICE, "Omit timestamp from output: %s",
config.OmitTimestamp ? "true" : "false");
}

syslog(LOG_NOTICE, "Discard reporting interval %lu seconds",
static_cast<unsigned long>(config.DiscardReportInterval));

if (config.UseOldInputFormat) {
syslog(LOG_NOTICE, "Omit writing syslog messages when discards occur: %s",
config.NoLogDiscard ? "true" : "false");
}

syslog(LOG_NOTICE, "Debug directory [%s]", config.DebugDir.c_str());
syslog(LOG_NOTICE, "Message debug time limit %lu seconds",
static_cast<unsigned long>(config.MsgDebugTimeLimit));
Expand All @@ -627,8 +594,4 @@ void Dory::LogConfig(const TConfig &config) {
syslog(LOG_NOTICE, config.TopicAutocreate ?
"Automatic topic creation enabled" :
"Automatic topic creation disabled");
syslog(LOG_NOTICE, "Using %s input datagram format",
config.UseOldInputFormat ? "old" : "new");
syslog(LOG_NOTICE, "Using %s output format",
config.UseOldOutputFormat ? "old" : "new");
}
6 changes: 0 additions & 6 deletions src/dory/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ namespace Dory {

size_t MinPauseDelay;

bool OmitTimestamp;

size_t DiscardReportInterval;

bool NoLogDiscard;
Expand All @@ -120,10 +118,6 @@ namespace Dory {
size_t DiscardReportBadMsgPrefixSize;

bool TopicAutocreate;

bool UseOldInputFormat;

bool UseOldOutputFormat;
}; // TConfig

void LogConfig(const TConfig &config);
Expand Down
2 changes: 1 addition & 1 deletion src/dory/debug/debug_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void TDebugLogger::LogMsg(const TMsg &msg) {
LogEntry += Encoded;
LogEntry += "] value: ";
RawData.clear();
WriteValue(RawData, 0, msg, AddTimestamp, UseOldOutputFormat);
WriteValue(RawData, 0, msg);
Encoded.clear();

if (!RawData.empty()) {
Expand Down
9 changes: 1 addition & 8 deletions src/dory/debug/debug_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ namespace Dory {
NO_COPY_SEMANTICS(TDebugLogger);

public:
TDebugLogger(const TDebugSetup &debug_setup, TDebugSetup::TLogId log_id,
bool add_timestamp, bool use_old_output_format)
TDebugLogger(const TDebugSetup &debug_setup, TDebugSetup::TLogId log_id)
: DebugSetup(debug_setup),
LogId(log_id),
AddTimestamp(add_timestamp),
UseOldOutputFormat(use_old_output_format),
Settings(debug_setup.GetSettings()),
LogFd(Settings->GetLogFileDescriptor(log_id)),
CachedSettingsVersion(Settings->GetVersion()),
Expand Down Expand Up @@ -87,10 +84,6 @@ namespace Dory {

const TDebugSetup::TLogId LogId;

const bool AddTimestamp;

const bool UseOldOutputFormat;

TSettings::TPtr Settings;

int LogFd;
Expand Down
13 changes: 5 additions & 8 deletions src/dory/discard_file_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ static void CreateDir(const char *dir) {

TDiscardFileLogger::TDiscardFileLogger()
: MaxMsgPrefixLen(std::numeric_limits<size_t>::max()),
UseOldOutputFormat(false),
Enabled(false),
MaxFileSize(0),
MaxArchiveSize(0) {
Expand All @@ -120,8 +119,7 @@ TDiscardFileLogger::~TDiscardFileLogger() noexcept {
}

void TDiscardFileLogger::Init(const char *log_path, uint64_t max_file_size,
uint64_t max_archive_size, size_t max_msg_prefix_len,
bool use_old_output_format) {
uint64_t max_archive_size, size_t max_msg_prefix_len) {
assert(this);

/* Will contain absolute path of directory containing logfile. */
Expand All @@ -141,7 +139,6 @@ void TDiscardFileLogger::Init(const char *log_path, uint64_t max_file_size,

CreateDir(log_dir.c_str());
MaxMsgPrefixLen = max_msg_prefix_len;
UseOldOutputFormat = use_old_output_format;

/* Since we are executing during server initialization before any threads can
create log entries, we modify our internal state without grabbing 'Mutex'.
Expand Down Expand Up @@ -212,7 +209,7 @@ void TDiscardFileLogger::LogDiscard(const TMsg &msg, TDiscardReason reason) {
WriteKey(key_buf, 0, msg);
EnforceMaxPrefixLen(key_buf);
std::vector<uint8_t> value_buf;
WriteValue(value_buf, 0, msg, false, UseOldOutputFormat);
WriteValue(value_buf, 0, msg);
EnforceMaxPrefixLen(value_buf);
const uint8_t *key_buf_begin = key_buf.empty() ? nullptr : &key_buf[0];
const uint8_t *value_buf_begin = value_buf.empty() ? nullptr : &value_buf[0];
Expand All @@ -232,7 +229,7 @@ void TDiscardFileLogger::LogDuplicate(const TMsg &msg) {
WriteKey(key_buf, 0, msg);
EnforceMaxPrefixLen(key_buf);
std::vector<uint8_t> value_buf;
WriteValue(value_buf, 0, msg, false, UseOldOutputFormat);
WriteValue(value_buf, 0, msg);
EnforceMaxPrefixLen(value_buf);
const uint8_t *key_buf_begin = key_buf.empty() ? nullptr : &key_buf[0];
const uint8_t *value_buf_begin = value_buf.empty() ? nullptr : &value_buf[0];
Expand Down Expand Up @@ -368,7 +365,7 @@ void TDiscardFileLogger::LogBadTopicDiscard(const TMsg &msg) {
WriteKey(key_buf, 0, msg);
EnforceMaxPrefixLen(key_buf);
std::vector<uint8_t> value_buf;
WriteValue(value_buf, 0, msg, false, UseOldOutputFormat);
WriteValue(value_buf, 0, msg);
EnforceMaxPrefixLen(value_buf);
const uint8_t *key_buf_begin = key_buf.empty() ? nullptr : &key_buf[0];
const uint8_t *value_buf_begin = value_buf.empty() ? nullptr : &value_buf[0];
Expand All @@ -388,7 +385,7 @@ void TDiscardFileLogger::LogLongMsgDiscard(const TMsg &msg) {
WriteKey(key_buf, 0, msg);
EnforceMaxPrefixLen(key_buf);
std::vector<uint8_t> value_buf;
WriteValue(value_buf, 0, msg, false, UseOldOutputFormat);
WriteValue(value_buf, 0, msg);
EnforceMaxPrefixLen(value_buf);
const uint8_t *key_buf_begin = key_buf.empty() ? nullptr : &key_buf[0];
const uint8_t *value_buf_begin = value_buf.empty() ? nullptr : &value_buf[0];
Expand Down
5 changes: 1 addition & 4 deletions src/dory/discard_file_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ namespace Dory {
deleted in order from oldest to newest until 'max_archive_size' is no
longer exceeded. */
void Init(const char *log_path, uint64_t max_file_size,
uint64_t max_archive_size, size_t max_msg_prefix_len,
bool use_old_output_format);
uint64_t max_archive_size, size_t max_msg_prefix_len);

/* Call this to disable logging and shut down the thread that deletes old
logfiles. It is harmless to call this method once or multiple times,
Expand Down Expand Up @@ -249,8 +248,6 @@ namespace Dory {

size_t MaxMsgPrefixLen;

bool UseOldOutputFormat;

/* Protects everything below. However, reads of boolean 'Enabled' value
may occur without acquiring 'Mutex' */
std::mutex Mutex;
Expand Down
8 changes: 2 additions & 6 deletions src/dory/dory.test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,15 +1202,11 @@ namespace {
TAnomalyTracker::TInfo bad_stuff;
dory->GetAnomalyTracker().GetInfo(bad_stuff);

if (bad_stuff.UnsupportedVersionMsgCount == 0) {
if (bad_stuff.UnsupportedApiKeyMsgCount == 0) {
continue;
}

ASSERT_EQ(bad_stuff.UnsupportedVersionMsgCount, 1U);
auto iter = bad_stuff.UnsupportedVersionMsgs.find(-1);
ASSERT_FALSE(iter == bad_stuff.UnsupportedVersionMsgs.end());
ASSERT_EQ(iter->first, -1);

ASSERT_EQ(bad_stuff.UnsupportedApiKeyMsgCount, 1U);
ASSERT_EQ(bad_stuff.MalformedMsgCount, 0U);
ASSERT_EQ(bad_stuff.MalformedMsgs.size(), 0U);
ASSERT_EQ(bad_stuff.DuplicateTopicMap.size(), 0U);
Expand Down
3 changes: 1 addition & 2 deletions src/dory/dory_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,7 @@ bool TDoryServer::StartMsgHandlingThreads() {
DiscardFileLogger.Init(Config->DiscardLogPath.c_str(),
static_cast<uint64_t>(Config->DiscardLogMaxFileSize) * 1024,
static_cast<uint64_t>(Config->DiscardLogMaxArchiveSize) * 1024,
Config->DiscardLogBadMsgPrefixSize,
Config->UseOldOutputFormat);
Config->DiscardLogBadMsgPrefixSize);
}

if (StreamClientWorkerPool.IsKnown()) {
Expand Down
Loading

0 comments on commit 391d19e

Please sign in to comment.