Skip to content
This repository has been archived by the owner on Sep 27, 2019. It is now read-only.

Commit

Permalink
Finished merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jarulraj committed Jul 19, 2016
1 parent 1338f54 commit bb3c6f1
Show file tree
Hide file tree
Showing 18 changed files with 31 additions and 674 deletions.
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set(SQLParser_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/sql_parser.cpp")
set(SQLScanner_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/sql_scanner.cpp")

if(NOT EXISTS "${SQLScanner_LOCATION}")
message("Building parser for the first time.")
message("Building parser for the first time.")
BISON_TARGET(SQLParser ${PROJECT_SOURCE_DIR}/src/parser/sql_parser.y ${SQLParser_LOCATION})
FLEX_TARGET(SQLScanner ${PROJECT_SOURCE_DIR}/src/parser/sql_scanner.l ${SQLScanner_LOCATION})
ADD_FLEX_BISON_DEPENDENCY(SQLScanner SQLParser)
Expand Down
5 changes: 5 additions & 0 deletions src/common/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ char *peloton_log_directory;
int64_t peloton_wait_timeout;

int peloton_flush_frequency_micros;

int peloton_flush_mode;

// pcommit latency (for NVM WBL)
int peloton_pcommit_latency;
6 changes: 0 additions & 6 deletions src/include/benchmark/logger/logger_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class configuration {
// Benchmark type
BenchmarkType benchmark_type;

int replication_port;

char *remote_endpoint;

ReplicationType replication_mode;

// clflush or clwb
int flush_mode;

Expand Down
22 changes: 0 additions & 22 deletions src/include/logging/frontend_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "logging/backend_logger.h"
#include "logging/checkpoint.h"

extern char *peloton_endpoint_address;

namespace peloton {
namespace logging {
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -111,13 +109,6 @@ class FrontendLogger : public Logger {
backend_loggers_lock.Unlock();
}

void RemoteDone(long sequence_number) {
while (sequence_number > remote_done_.load()) {
long old = remote_done_.load();
remote_done_.compare_exchange_weak(old, sequence_number);
}
}

void SetNoWrite(bool no_write) { no_write_ = no_write; }

protected:
Expand All @@ -144,23 +135,10 @@ class FrontendLogger : public Logger {

cid_t max_seen_commit_id = 0;

// variables for replication
std::unique_ptr<networking::PelotonLoggingService_Stub> replication_stub_;
std::unique_ptr<networking::RpcChannel> channel_;
std::unique_ptr<networking::RpcController> controller_;

bool replicating_ = false;

networking::ResponseType replication_mode_;

bool no_write_ = false;

bool test_mode_ = false;

std::atomic<long> remote_done_;

long replication_seq_ = 1;

bool is_distinguished_logger = false;
};

Expand Down
2 changes: 0 additions & 2 deletions src/include/logging/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
//===--------------------------------------------------------------------===//
extern LoggingType peloton_logging_mode;

extern char *peloton_endpoint_address;

namespace peloton {
namespace logging {

Expand Down
6 changes: 2 additions & 4 deletions src/include/logging/loggers/wbl_backend_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@

#pragma once

#include <unordered_set>

#include "common/types.h"
#include "logging/backend_logger.h"
#include "concurrency/transaction_manager_factory.h"

#include "unordered_set"

extern char *peloton_endpoint_address;

namespace peloton {
namespace logging {

Expand Down
23 changes: 1 addition & 22 deletions src/logging/frontend_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,11 @@
// configuration for testing
extern int64_t peloton_wait_timeout;

extern ReplicationType peloton_replication_mode;

namespace peloton {
namespace logging {

FrontendLogger::FrontendLogger() {
switch (peloton_replication_mode) {
case ReplicationType::SYNC_REPLICATION:
replication_mode_ = networking::SYNC;
break;
case ReplicationType::ASYNC_REPLICATION:
default:
replication_mode_ = networking::ASYNC;
break;
case ReplicationType::SEMISYNC_REPLICATION:
replication_mode_ = networking::SEMISYNC;
break;
}
if (peloton_endpoint_address != nullptr) {
remote_done_ = 0;
replicating_ = true;
controller_.reset(new networking::RpcController());
channel_.reset(new networking::RpcChannel(peloton_endpoint_address));
replication_stub_.reset(
new networking::PelotonLoggingService_Stub(channel_.get()));
}

logger_type = LOGGER_TYPE_FRONTEND;

// Set wait timeout
Expand Down
46 changes: 0 additions & 46 deletions src/logging/loggers/wal_frontend_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,9 @@ void WriteAheadFrontendLogger::FlushLogRecords(void) {
}
}

size_t write_size = 0;
size_t rep_array_offset = 0;
std::unique_ptr<char> replication_array = nullptr;
TransactionRecord delimiter_rec(LOGRECORD_TYPE_ITERATION_DELIMITER,
this->max_collected_commit_id);
delimiter_rec.Serialize(output_buffer);
if (replicating_) {
// find the size we need to write out
for (oid_t global_queue_itr = 0; global_queue_itr < global_queue_size;
global_queue_itr++) {
write_size += global_queue[global_queue_itr]->GetSize();
}
if (max_collected_commit_id != max_flushed_commit_id) {
write_size += delimiter_rec.GetMessageLength();
}
replication_array.reset(new char[write_size]);
}

// First, write all the record in the queue
for (oid_t global_queue_itr = 0; global_queue_itr < global_queue_size;
Expand All @@ -160,11 +146,6 @@ void WriteAheadFrontendLogger::FlushLogRecords(void) {
cur_file_handle.file);
}

if (replicating_) {
memcpy(replication_array.get() + rep_array_offset, log_buffer->GetData(),
log_buffer->GetSize());
rep_array_offset += log_buffer->GetSize();
}
LOG_TRACE("Log buffer get max log id returned %d",
(int)log_buffer->GetMaxLogId());

Expand All @@ -180,26 +161,6 @@ void WriteAheadFrontendLogger::FlushLogRecords(void) {
}

bool flushed = false;
long rep_seq_number = 0;
// send to remote before fsyncing, then wait for the response after fsy
if (replicating_ && write_size > 0) {
if (max_collected_commit_id != max_flushed_commit_id) {
memcpy(replication_array.get() + rep_array_offset,
delimiter_rec.GetMessage(), delimiter_rec.GetMessageLength());
// send the request
rep_array_offset += delimiter_rec.GetMessageLength();
}
networking::LogRecordReplayRequest request;
request.set_log(replication_array.get(), write_size);
request.set_sync_type(replication_mode_);
rep_seq_number = replication_seq_++;
request.set_sequence_number(rep_seq_number);
assert(request.sequence_number() != 0);
networking::LogRecordReplayResponse response;
remote_done_ = false;
replication_stub_->LogRecordReplay(controller_.get(), &request, &response,
nullptr);
}

if (max_collected_commit_id != max_flushed_commit_id) {
if (!test_mode_) {
Expand Down Expand Up @@ -245,13 +206,6 @@ void WriteAheadFrontendLogger::FlushLogRecords(void) {
}
}
}
// if replicating and doing sync or semisync wait here
if (rep_seq_number > 0 && (replication_mode_ == networking::SYNC ||
replication_mode_ == networking::SEMISYNC)) {
// wait for the response with the proper sequence number
while (remote_done_.load() < rep_seq_number)
;
}

/* For now, fflush after every iteration of collecting buffers */
// Clean up the frontend logger's queue
Expand Down
1 change: 0 additions & 1 deletion src/logging/loggers/wbl_backend_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ void WriteBehindBackendLogger::Log(LogRecord *record) {
}
}
log_buffer_lock.Lock();
cid_t cur_log_id = record->GetTransactionId();
switch (record->GetType()) {
case LOGRECORD_TYPE_TRANSACTION_COMMIT:
highest_logged_commit_message = record->GetTransactionId();
Expand Down
64 changes: 0 additions & 64 deletions src/logging/loggers/wbl_frontend_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,76 +93,12 @@ void WriteBehindFrontendLogger::FlushLogRecords(void) {
LOG_ERROR("Unable to write log record");
}
}
long curr_seq_num = 0;
size_t global_queue_size = global_queue.size();
if (replicating_ && global_queue_size > 0) {
size_t global_queue_size = global_queue.size();

size_t write_size = 0;
size_t rep_array_offset = 0;
std::unique_ptr<char> replication_array = nullptr;
TransactionRecord delimiter_rec(LOGRECORD_TYPE_ITERATION_DELIMITER,
this->max_collected_commit_id);
delimiter_rec.Serialize(output_buffer);

// find the size we need to write out
for (oid_t global_queue_itr = 0; global_queue_itr < global_queue_size;
global_queue_itr++) {
write_size += global_queue[global_queue_itr]->GetSize();
}
if (max_collected_commit_id != max_flushed_commit_id) {
write_size += delimiter_rec.GetMessageLength();
}
replication_array.reset(new char[write_size]);

for (oid_t global_queue_itr = 0; global_queue_itr < global_queue_size;
global_queue_itr++) {
auto &log_buffer = global_queue[global_queue_itr];

memcpy(replication_array.get() + rep_array_offset, log_buffer->GetData(),
log_buffer->GetSize());
rep_array_offset += log_buffer->GetSize();

// return empty buffer
auto backend_logger = log_buffer->GetBackendLogger();
log_buffer->ResetData();
backend_logger->GrantEmptyBuffer(std::move(log_buffer));
}

if (max_collected_commit_id != max_flushed_commit_id) {
PL_ASSERT(rep_array_offset + delimiter_rec.GetMessageLength() ==
write_size);
memcpy(replication_array.get() + rep_array_offset,
delimiter_rec.GetMessage(), delimiter_rec.GetMessageLength());
// send the request
rep_array_offset += delimiter_rec.GetMessageLength();
}

networking::LogRecordReplayRequest request;
request.set_log(replication_array.get(), write_size);
curr_seq_num = replication_seq_++;
request.set_sync_type(replication_mode_);
request.set_sequence_number(curr_seq_num);
networking::LogRecordReplayResponse response;
replication_stub_->LogRecordReplay(controller_.get(), &request, &response,
nullptr);

global_queue.clear();
}

// for now fsync every time because the cost is relatively low
if (fsync(log_file_fd)) {
LOG_ERROR("Unable to fsync log");
}

// if replicating in the proper mode, wait here
if (curr_seq_num > 0 && (replication_mode_ == networking::SYNC ||
replication_mode_ == networking::SEMISYNC)) {
// wait for the response with the proper sequence number
while (remote_done_.load() < curr_seq_num)
;
}

// inform backend loggers they can proceed if waiting for sync
max_flushed_commit_id = max_collected_commit_id;
auto &manager = LogManager::GetInstance();
Expand Down

0 comments on commit bb3c6f1

Please sign in to comment.