Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New zip log #7

Open
wants to merge 9 commits into
base: kepler0.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CXX = g++
SHARED_CFLAGS = -fPIC
SHARED_LDFLAGS = -shared -Wl,-soname -Wl,

INCPATH += -I./src -I./include $(DEPS_INCPATH)
INCPATH += -I./src -I./include -I./src/leveldb/include -I./src/leveldb $(DEPS_INCPATH)
CFLAGS += -std=c99 $(OPT) $(SHARED_CFLAGS) $(INCPATH)
CXXFLAGS += $(OPT) $(SHARED_CFLAGS) $(INCPATH)
LDFLAGS += -rdynamic $(DEPS_LDPATH) $(DEPS_LDFLAGS) -lpthread -lrt -lz -ldl
Expand All @@ -19,6 +19,7 @@ PROTO_FILES := $(wildcard src/proto/*.proto)
PROTO_OUT_CC := $(PROTO_FILES:.proto=.pb.cc)
PROTO_OUT_H := $(PROTO_FILES:.proto=.pb.h)

############################################################
SDK_SRC := $(wildcard src/sdk/*.cc)
COMMON_SRC := $(wildcard src/common/*.cc)
UTIL_SRC := $(wildcard src/util/*.cc)
Expand All @@ -34,17 +35,13 @@ MULWRITE_TEST_SRC := $(wildcard src/benchmark/mulcli_write_test.cc)
SCAN_TEST_SRC := $(wildcard src/benchmark/scan_test.cc)
C_SAMPLE_SRC := $(wildcard src/sample/c_sample.c)

###########################
# trace collector #
###########################
FTRACE_SRC := $(wildcard src/ftrace/collector/*.cc)
FTRACE_TEST_SRC := $(wildcard src/ftrace/test/TEST_log.cc)

###########################
# search engine #
###########################
FTRACE_SEARCHENGINE_SRC := $(wildcard src/ftrace/search_engine/*.cc)
FTRACE_TEST_SRC := $(wildcard src/ftrace/collector/test/*.cc)
AGENT_SRC := $(wildcard src/agent/*.cc)
COLLECTOR_SRC := $(wildcard src/collector/*.cc)
SCHEDULER_SRC := $(wildcard src/scheduler/*.cc)

############################################################
SDK_OBJ := $(SDK_SRC:.cc=.o)
COMMON_OBJ := $(COMMON_SRC:.cc=.o)
UTIL_OBJ := $(UTIL_SRC:.cc=.o)
Expand All @@ -60,23 +57,24 @@ MULWRITE_TEST_OBJ := $(MULWRITE_TEST_SRC:.cc=.o)
SCAN_TEST_OBJ := $(SCAN_TEST_SRC:.cc=.o)
C_SAMPLE_OBJ := $(C_SAMPLE_SRC:.c=.o)

###########################
# trace collector
###########################
FTRACE_OBJ := $(FTRACE_SRC:.cc=.o)
FTRACE_TEST_OBJ := $(FTRACE_TEST_SRC:.cc=.o)

###########################
# search engine #
###########################
FTRACE_SEARCHENGINE_OBJ := $(FTRACE_SEARCHENGINE_SRC:.cc=.o)
AGENT_OBJ := $(AGENT_SRC:.cc=.o)
COLLECTOR_OBJ := $(COLLECTOR_SRC:.cc=.o)
SCHEDULER_OBJ := $(SCHEDULER_SRC:.cc=.o)

CXX_OBJ := $(SDK_OBJ) $(COMMON_OBJ) $(UTIL_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) \
$(SAMPLE_OBJ) $(MDTTOOL_OBJ) $(WRITE_TEST_OBJ) $(MULWRITE_TEST_OBJ) \
$(SCAN_TEST_OBJ) $(SYNC_WRITE_TEST_OBJ) $(UPDATESCHEMA_OBJ) $(DUMPFILE_OBJ)
$(SCAN_TEST_OBJ) $(SYNC_WRITE_TEST_OBJ) $(UPDATESCHEMA_OBJ) $(DUMPFILE_OBJ) \
$(COLLECTOR_OBJ) $(SCHEDULER_OBJ)
C_OBJ := $(C_SAMPLE_OBJ)
LEVELDB_LIB := src/leveldb/libleveldb.a

############################################################
PROGRAM = agent_main collector_main scheduler_main
FTRACELIBRARY = libftrace.a
FTRACE_TEST = TEST_log

PROGRAM =
LIBRARY = libmdt.a
SAMPLE = sample
MDTTOOL = mdt-tool
Expand All @@ -89,12 +87,6 @@ MULWRITE_TEST = mulcli_write_test
SCAN_TEST = scan_test
C_SAMPLE = c_sample

###########################
# trace collector
###########################
FTRACELIBRARY = libftrace.a
FTRACE_TEST = TEST_log

.PHONY: all clean cleanall test
all: $(PROGRAM) $(LIBRARY) $(FTRACELIBRARY) $(SAMPLE) $(C_SAMPLE) $(MDTTOOL) $(WRITE_TEST) $(DUMPFILE) $(MULWRITE_TEST) $(SCAN_TEST) $(SYNC_WRITE_TEST) $(UPDATESCHEMA) $(FTRACE_TEST)
mkdir -p build/include build/lib build/bin
Expand All @@ -108,7 +100,7 @@ all: $(PROGRAM) $(LIBRARY) $(FTRACELIBRARY) $(SAMPLE) $(C_SAMPLE) $(MDTTOOL) $(W
echo 'Done'

clean:
rm -rf $(CXX_OBJ) $(C_OBJ) $(FTRACE_OBJ)
rm -rf $(CXX_OBJ) $(C_OBJ) $(FTRACE_OBJ) $(AGENT_OBJ) $(SCHEDULER_OBJ) $(COLLECTOR_OBJ)
rm -rf $(PROGRAM) $(LIBRARY) $(FTRACELIBRARY) $(SAMPLE) $(C_SAMPLE) $(MDTTOOL) $(WRITE_TEST) $(MULWRITE_TEST) $(SCAN_TEST) $(SYNC_WRITE_TEST) $(UPDATESCHEMA) $(FTRACE_TEST)
rm -rf search_service
rm -rf $(DUMPFILE)
Expand All @@ -117,6 +109,9 @@ cleanall:
$(MAKE) clean
rm -rf build

src/leveldb/libleveldb.a: FORCE
$(MAKE) -C src/leveldb

sample: $(SAMPLE_OBJ) $(LIBRARY)
$(CXX) -o $@ $(SAMPLE_OBJ) $(LIBRARY) $(LDFLAGS)

Expand Down Expand Up @@ -153,8 +148,14 @@ libftrace.a: $(FTRACE_OBJ) $(PROTO_OBJ) $(VERSION_OBJ)
TEST_log: $(FTRACE_TEST_OBJ) $(FTRACELIBRARY)
$(CXX) -o $@ $(FTRACE_TEST_OBJ) $(FTRACELIBRARY) $(LDFLAGS)

search_service: $(FTRACE_SEARCHENGINE_OBJ) $(LIBRARY)
$(CXX) -o search_service $(FTRACE_SEARCHENGINE_OBJ) $(LIBRARY) $(LDFLAGS)
agent_main: $(AGENT_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LEVELDB_LIB)
$(CXX) -o agent_main $(AGENT_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LDFLAGS) $(LEVELDB_LIB)

collector_main: $(COLLECTOR_OBJ) $(LIBRARY)
$(CXX) -o collector_main $(COLLECTOR_OBJ) $(LIBRARY) $(LDFLAGS)

scheduler_main: $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ)
$(CXX) -o scheduler_main $(SCHEDULER_OBJ) $(PROTO_OBJ) $(VERSION_OBJ) $(LDFLAGS)

$(CXX_OBJ): %.o: %.cc $(PROTO_OUT_H)
$(CXX) $(CXXFLAGS) -c $< -o $@
Expand Down
83 changes: 83 additions & 0 deletions src/agent/agent_enter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include "agent/agent_impl.h"
#include <gflags/gflags.h>
#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <glog/logging.h>
#include <tera.h>

DECLARE_string(agent_service_port);
DECLARE_string(log_dir);

void SetupLog(const std::string& name) {
std::string program_name = "mdt";
if (!name.empty()) {
program_name = name;
}

if (FLAGS_log_dir.size() == 0) {
if (access("../log", F_OK)) {
FLAGS_log_dir = "../log";
} else {
FLAGS_log_dir = "./log";
}
}

if (access(FLAGS_log_dir.c_str(), F_OK)) {
mkdir(FLAGS_log_dir.c_str(), 0777);
}

std::string log_filename = FLAGS_log_dir + "/" + program_name + ".INFO.";
std::string wf_filename = FLAGS_log_dir + "/" + program_name + ".WARNING.";
google::SetLogDestination(google::INFO, log_filename.c_str());
google::SetLogDestination(google::WARNING, wf_filename.c_str());
google::SetLogDestination(google::ERROR, "");
google::SetLogDestination(google::FATAL, "");

google::SetLogSymlink(google::INFO, program_name.c_str());
google::SetLogSymlink(google::WARNING, program_name.c_str());
google::SetLogSymlink(google::ERROR, "");
google::SetLogSymlink(google::FATAL, "");
}

static pthread_once_t glog_once = PTHREAD_ONCE_INIT;
static void InternalSetupGoogleLog() {
// init param, setup log
std::string log_prefix = "agent";
::google::InitGoogleLogging(log_prefix.c_str());
SetupLog(log_prefix);
tera::Client::SetGlogIsInitialized();
LOG(INFO) << "start loging...";
}

void SetupGoogleLog() {
pthread_once(&glog_once, InternalSetupGoogleLog);
}

int main(int ac, char* av[]) {
::google::ParseCommandLineFlags(&ac, &av, true);
SetupGoogleLog();
// run agent
mdt::agent::AgentImpl* agent = new mdt::agent::AgentImpl();
if (agent == NULL) {
std::cout << "can not new log agent\n";
exit(-1);
}
if (agent->Init() < 0) {
std::cout << "agent init error\n";
exit(-1);
}

// register rpc service
::sofa::pbrpc::RpcServerOptions options;
::sofa::pbrpc::RpcServer rpc_server(options);
if (!rpc_server.RegisterService(agent)) {
return -1;
}
std::string hostip = std::string("0.0.0.0:") + FLAGS_agent_service_port;
if (!rpc_server.Start(hostip)) {
return -1;
}
rpc_server.Run();
return 0;
}

55 changes: 55 additions & 0 deletions src/agent/agent_flags.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include <sys/inotify.h>
#include "agent/options.h"
#include <gflags/gflags.h>

mdt::agent::EventMask event_masks[] = {
{IN_ACCESS , "IN_ACCESS"} ,
{IN_ATTRIB , "IN_ATTRIB"} ,
{IN_CLOSE_WRITE , "IN_CLOSE_WRITE"} ,
{IN_CLOSE_NOWRITE , "IN_CLOSE_NOWRITE"} ,
{IN_CREATE , "IN_CREATE"} ,
{IN_DELETE , "IN_DELETE"} ,
{IN_DELETE_SELF , "IN_DELETE_SELF"} ,
{IN_MODIFY , "IN_MODIFY"} ,
{IN_MOVE_SELF , "IN_MOVE_SELF"} ,
{IN_MOVED_FROM , "IN_MOVED_FROM"} ,
{IN_MOVED_TO , "IN_MOVED_TO"} ,
{IN_OPEN , "IN_OPEN"} ,
{IN_DONT_FOLLOW , "IN_DONT_FOLLOW"} ,
{IN_EXCL_UNLINK , "IN_EXCL_UNLINK"} ,
{IN_MASK_ADD , "IN_MASK_ADD"} ,
{IN_ONESHOT , "IN_ONESHOT"} ,
{IN_ONLYDIR , "IN_ONLYDIR"} ,
{IN_IGNORED , "IN_IGNORED"} ,
{IN_ISDIR , "IN_ISDIR"} ,
{IN_Q_OVERFLOW , "IN_Q_OVERFLOW"} ,
{IN_UNMOUNT , "IN_UNMOUNT"} ,
};

DEFINE_string(agent_service_port, "33331", "agent port");
DEFINE_int32(file_stream_max_pending_request, 10000, "max pending write req");
DEFINE_string(scheduler_addr, "0.0.0.0:11111", "scheduler server addr");
DEFINE_string(db_dir, "../leveldb_dir/", "leveldb dir for cp");
DEFINE_string(watch_log_dir, "../watch_log_dir/", "log dir");
DEFINE_string(module_name_list, "tabletnode.1.", "identify module name");

DEFINE_string(db_name, "mdttrace-debug", "production name");
DEFINE_string(table_name, "trace", "table name");
DEFINE_string(primary_key, "", "primary key name");
DEFINE_string(user_time, "", "user point out which field use as timestamp");
DEFINE_int32(time_type, 1, "use for parse user time from log");

// split string by substring
//DEFINE_string(string_delims, "||", "split string by substring");
DEFINE_string(string_delims, "", "split string by substring");

// split string by char
DEFINE_string(line_delims, " ", "log tailer's line delim");
DEFINE_string(kv_delims, "=", "log tailer's kv delim");
DEFINE_bool(enable_index_filter, false, "do not filter log line by index list in agent");
DEFINE_string(index_list, "", "index table name list");

// split string by index number
DEFINE_bool(use_fixed_index_list, true, "use fixed index list");
DEFINE_string(fixed_index_list, "url:5,time:2", "use for fix index list match");

Loading