Skip to content
This repository has been archived by the owner on Nov 5, 2022. It is now read-only.

Commit

Permalink
Export of Google internal changes to Ripple.
Browse files Browse the repository at this point in the history
1. Add a missing shared lock of position_mutex_
2. Name all locks of position_mutex_ consistently as position_lock
3. Demote some position_mutex_ locks from exclusive to shared
4. Add thread safety annotations for Binlog::position_
5. Annotate the acquisition order of mutexes in class Binlog
6. Applied ClangTidy fixes to Ripple.

PiperOrigin-RevId: 223357382
  • Loading branch information
nerdatmath authored and pivanof committed Feb 12, 2019
1 parent fab4950 commit 3b76275
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 72 deletions.
13 changes: 7 additions & 6 deletions binlog.cc
Expand Up @@ -411,7 +411,7 @@ int Binlog::Recover() {
return -1;
}

absl::MutexLock lock(&position_mutex_);
absl::MutexLock position_lock(&position_mutex_);
position_ = pos;

LOG(INFO) << "Binlog recovery complete\n"
Expand All @@ -430,6 +430,7 @@ bool Binlog::IsOpen() const {

// Close an opened binlog.
bool Binlog::Close() {
absl::ReaderMutexLock position_lock(&position_mutex_);
absl::MutexLock file_lock(&file_mutex_);
if (binlog_file_ != nullptr)
CloseFileLocked();
Expand Down Expand Up @@ -464,7 +465,7 @@ bool Binlog::GetPosition(const GTIDList &pos, BinlogPosition *dst,

// Get local binlog position, file/pos which is currently being written to.
BinlogPosition Binlog::GetBinlogPosition() {
absl::MutexLock lock(&position_mutex_);
absl::ReaderMutexLock position_lock(&position_mutex_);
return position_;
}

Expand Down Expand Up @@ -494,8 +495,8 @@ bool Binlog::GetNextFile(FilePosition *pos) const {
bool Binlog::WaitBinlogEndPosition(FilePosition *pos,
int64_t *truncate_counter,
absl::Duration timeout) {
absl::MutexLock lock(&position_mutex_);
auto check = [this, pos]() {
absl::ReaderMutexLock position_lock(&position_mutex_);
auto check = [this, pos]() SHARED_LOCKS_REQUIRED(position_mutex_) {
return stop_ || !position_.latest_completed_gtid_position.equal(*pos);
};
position_mutex_.AwaitWithTimeout(absl::Condition(&check), timeout);
Expand All @@ -518,7 +519,7 @@ bool Binlog::WaitBinlogEndPosition(FilePosition *pos,
}

void Binlog::Stop() {
absl::MutexLock lock(&position_mutex_);
absl::MutexLock position_lock(&position_mutex_);
stop_ = true;
}

Expand Down Expand Up @@ -952,7 +953,7 @@ bool Binlog::PurgeLogsUntil(absl::string_view to_file,
}

{
absl::MutexLock lock(&position_mutex_);
absl::MutexLock position_lock(&position_mutex_);
position_.gtid_purged = index_.GetOldestEntry().start_position;
}

Expand Down
56 changes: 31 additions & 25 deletions binlog.h
Expand Up @@ -47,38 +47,39 @@ class Binlog : public BinlogReader::BinlogInterface {
public:
explicit Binlog(const char *directory, int64_t max_binlog_size,
const file::Factory &ff);
virtual ~Binlog() LOCKS_EXCLUDED(file_mutex_);
virtual ~Binlog() LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Create a binlog file and open if non-exists.
// If a binlog already exists return false.
// Not thread safe.
virtual bool Create() LOCKS_EXCLUDED(file_mutex_);
virtual bool Create() LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Create binlog with start position and open if non-exists.
// This is used for -ripple_requested_start_gtid_position.
// If a binlog already exists return false.
// Not thread safe.
virtual bool Create(const GTIDList &) LOCKS_EXCLUDED(file_mutex_);
virtual bool Create(const GTIDList &)
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Open binlog and recover/discard unfinished entries.
// return 0 - no state found on disk
// 1 - state recovered
// -1 - an error/inconsistency
// Not thread safe.
virtual int Recover() LOCKS_EXCLUDED(file_mutex_);
virtual int Recover() LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Close the binlog, if open.
// Always returns true.
// Thread safe.
virtual bool Close() LOCKS_EXCLUDED(file_mutex_);
virtual bool Close() LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Check if binlog is open.
// Thread safe.
virtual bool IsOpen() const LOCKS_EXCLUDED(file_mutex_);

// Get binlog position.
// Thread safe.
virtual BinlogPosition GetBinlogPosition();
virtual BinlogPosition GetBinlogPosition() LOCKS_EXCLUDED(position_mutex_);

// Wait for a file position other than pos (for BinlogReader)
// and store current end position in *pos.
Expand All @@ -88,7 +89,7 @@ class Binlog : public BinlogReader::BinlogInterface {
bool WaitBinlogEndPosition(FilePosition *pos,
int64_t *truncate_counter,
absl::Duration timeout) override
LOCKS_EXCLUDED(file_mutex_);
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Connection established.
// Thread safe.
Expand All @@ -97,16 +98,17 @@ class Binlog : public BinlogReader::BinlogInterface {
// Add an event to binlog.
// If wait is true, this method blocks until event has been written to disk.
virtual bool AddEvent(RawLogEventData event, bool wait)
LOCKS_EXCLUDED(file_mutex_);
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Switch local binlog file.
// Store name of new file in newfile.
virtual bool SwitchFile(std::string *newfile) LOCKS_EXCLUDED(file_mutex_);
virtual bool SwitchFile(std::string *newfile)
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Connection closed.
// Thread safe.
virtual void ConnectionClosed(const mysql::ClientConnection *)
LOCKS_EXCLUDED(file_mutex_);
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Register/unregister a binlog reader.
// This is used when purging logs so that we don't purge too far.
Expand Down Expand Up @@ -139,27 +141,30 @@ class Binlog : public BinlogReader::BinlogInterface {

// "Stop" binlog.
// Wake up all binlog readers waiting for more data.
virtual void Stop();
virtual void Stop() LOCKS_EXCLUDED(position_mutex_);

// Purge logs.
// On success, store name of oldest kept file in oldest_file.
virtual bool PurgeLogs(std::string *oldest_file);
virtual bool PurgeLogs(std::string *oldest_file)
LOCKS_EXCLUDED(position_mutex_);

// Purge logs with st_mtime < before_time.
// On success, store name of oldest kept file in oldest_file.
virtual bool PurgeLogsBefore(absl::Time before_time,
std::string *oldest_file);
virtual bool PurgeLogsBefore(absl::Time before_time, std::string *oldest_file)
LOCKS_EXCLUDED(position_mutex_);

// Purge logs, keeping at least keep_size bytes.
// On success, store name of oldest kept file in oldest_file.
virtual bool PurgeLogsKeepSize(size_t keep_size, std::string *oldest_file);
virtual bool PurgeLogsKeepSize(size_t keep_size, std::string *oldest_file)
LOCKS_EXCLUDED(position_mutex_);

// Purge logs up until not including to_file.
// On success, store the name of oldest kept file in oldest_file.
// Note that purge might stop short of to_file since a slave
// might be using an older file.
virtual bool PurgeLogsUntil(absl::string_view to_file,
std::string *oldest_file);
std::string *oldest_file)
LOCKS_EXCLUDED(position_mutex_);

private:
//
Expand All @@ -169,9 +174,7 @@ class Binlog : public BinlogReader::BinlogInterface {
absl::Mutex position_mutex_;

// This mutex prevents concurrent access to binlog_file_
// To prevent deadlocks, always obtain position_mutex_ before obtaining
// file_mutex_, and release in the reverse order.
mutable absl::Mutex file_mutex_;
mutable absl::Mutex file_mutex_ ACQUIRED_AFTER(position_mutex_);

// directory for binlog index+files
const std::string directory_;
Expand All @@ -190,7 +193,7 @@ class Binlog : public BinlogReader::BinlogInterface {
BinlogIndex index_;

// The binlog position.
BinlogPosition position_;
BinlogPosition position_ GUARDED_BY(position_mutex_);

// The file position of the last GTID that has been fully flushed to storage.
FilePosition flushed_gtid_position_;
Expand Down Expand Up @@ -225,7 +228,7 @@ class Binlog : public BinlogReader::BinlogInterface {
// On failure, it will not be open
bool CreateNewFile(const GTIDList& start_pos,
const FilePosition& master_pos)
LOCKS_EXCLUDED(file_mutex_);
LOCKS_EXCLUDED(file_mutex_, position_mutex_);

// Create a new binlog file (and add it to binlog index).
// Shall be called with file_mutex_ & position_mutex_ locked.
Expand All @@ -234,7 +237,7 @@ class Binlog : public BinlogReader::BinlogInterface {
// On failure, it will not be open
bool CreateNewFileLocked(const GTIDList& start_pos,
const FilePosition& master_pos)
EXCLUSIVE_LOCKS_REQUIRED(file_mutex_);
EXCLUSIVE_LOCKS_REQUIRED(file_mutex_, position_mutex_);

// Write start encryption event (if encryption is enabled).
bool WriteCryptInfo(file::AppendOnlyFile *file);
Expand All @@ -246,14 +249,16 @@ class Binlog : public BinlogReader::BinlogInterface {

// Write format descriptor for mysqld (and optionally StartEncryption)
// to start of binlog file.
bool WriteMasterFormatDescriptor() EXCLUSIVE_LOCKS_REQUIRED(file_mutex_);
bool WriteMasterFormatDescriptor()
EXCLUSIVE_LOCKS_REQUIRED(file_mutex_, position_mutex_);

// SwitchFile, shall be called with file_mutex_ & position_mutex_ locked.
// On entry the binlog must be open
// On return, a new binlog file will be open
// Returns true iff finalizing the old file and marking it for archival were
// successful.
bool SwitchFileLocked() EXCLUSIVE_LOCKS_REQUIRED(file_mutex_);
bool SwitchFileLocked()
EXCLUSIVE_LOCKS_REQUIRED(file_mutex_, position_mutex_);

// Check if this event shall be written to disk.
bool SkipWritingEvent(RawLogEventData event) const;
Expand All @@ -279,7 +284,8 @@ class Binlog : public BinlogReader::BinlogInterface {
// Close an opened binlog.
// On entry the file must be open and file_mutex_ must be held.
// On return, it will be closed and file_mutex_ remains held.
void CloseFileLocked() EXCLUSIVE_LOCKS_REQUIRED(file_mutex_);
void CloseFileLocked() EXCLUSIVE_LOCKS_REQUIRED(file_mutex_)
SHARED_LOCKS_REQUIRED(position_mutex_);

Binlog(Binlog&&) = delete;
Binlog(const Binlog&) = delete;
Expand Down
3 changes: 1 addition & 2 deletions binlog_index.h
Expand Up @@ -17,8 +17,7 @@
#ifndef MYSQL_RIPPLE_BINLOG_INDEX_H
#define MYSQL_RIPPLE_BINLOG_INDEX_H

#include <stdio.h>

#include <cstdio>
#include <string>
#include <vector>

Expand Down
4 changes: 2 additions & 2 deletions binlog_index_unittest.cc
Expand Up @@ -224,7 +224,7 @@ TEST(Binlog_index, Recover) {

// Add one file
GTIDList start_pos;
if (exp.start_pos.size()) {
if (!exp.start_pos.empty()) {
EXPECT_TRUE(start_pos.Parse(exp.start_pos));
}
EXPECT_TRUE(index.NewEntry(start_pos, FilePosition("master-bin", 0)));
Expand Down Expand Up @@ -399,7 +399,7 @@ bool Create(BinlogIndex *index,

for (const auto& entry : entries) {
GTIDList start_pos;
if (entry.first.size()) {
if (!entry.first.empty()) {
if (!start_pos.Parse(entry.first)) return false;
}
if (!index->NewEntry(start_pos, FilePosition("master-bin", 0)))
Expand Down
2 changes: 1 addition & 1 deletion binlog_reader.h
Expand Up @@ -17,7 +17,7 @@
#ifndef MYSQL_RIPPLE_BINLOG_READER_H
#define MYSQL_RIPPLE_BINLOG_READER_H

#include <stdio.h>
#include <cstdio>
#include <vector>

#include "absl/strings/string_view.h"
Expand Down
3 changes: 1 addition & 2 deletions buffer.h
Expand Up @@ -17,9 +17,8 @@
#ifndef MYSQL_RIPPLE_BUFFER_H
#define MYSQL_RIPPLE_BUFFER_H

#include <stdlib.h>

#include <cstdint>
#include <cstdlib>
#include <vector>

#include "absl/strings/string_view.h"
Expand Down
2 changes: 1 addition & 1 deletion connection.cc
Expand Up @@ -14,7 +14,7 @@

#include "connection.h"

#include <stdio.h>
#include <cstdio>

namespace mysql_ripple {

Expand Down
5 changes: 3 additions & 2 deletions encryption.cc
Expand Up @@ -14,10 +14,11 @@

#include "encryption.h"

#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h> // htonl

#include <cstdlib>
#include <cstring>

#include "absl/strings/string_view.h"
#include "my_crypt.h"
#include "my_crypt_key_management.h"
Expand Down
5 changes: 2 additions & 3 deletions encryption.h
Expand Up @@ -17,10 +17,9 @@
#ifndef MYSQL_RIPPLE_ENCRYPTION_H
#define MYSQL_RIPPLE_ENCRYPTION_H

#include <stdlib.h>
#include <stdio.h>

#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <memory>

#include "absl/strings/string_view.h"
Expand Down
2 changes: 1 addition & 1 deletion executor.cc
Expand Up @@ -14,7 +14,7 @@

#include "executor.h"

#include <stdlib.h>
#include <cstdlib>

#include "absl/time/time.h"

Expand Down
2 changes: 1 addition & 1 deletion file_base.h
Expand Up @@ -17,8 +17,8 @@
#ifndef MYSQL_RIPPLE_FILE_BASE_H
#define MYSQL_RIPPLE_FILE_BASE_H

#include <stdlib.h>
#include <cstdint>
#include <cstdlib>

#include "absl/strings/string_view.h"
#include "absl/time/time.h"
Expand Down
2 changes: 1 addition & 1 deletion gtid.cc
Expand Up @@ -314,7 +314,7 @@ std::string GTIDSet::ToString() const {
// guess that each interval takes 15 characters
dst.reserve(gtid_intervals_.size() * (Uuid::TEXT_LENGTH + 1 + 15));
for (const GTIDInterval& set : gtid_intervals_) {
if (dst.size() > 0)
if (!dst.empty())
dst += ',';
size_t len = dst.length();
dst.resize(len + Uuid::TEXT_LENGTH + 1);
Expand Down

0 comments on commit 3b76275

Please sign in to comment.