Skip to content

Commit

Permalink
Fix db_stress for custom env (#5122)
Browse files Browse the repository at this point in the history
Summary:
Fix some hdfs-related code so that it can compile and run 'db_stress'
Pull Request resolved: #5122

Differential Revision: D14675495

Pulled By: riversand963

fbshipit-source-id: cac280479efcf5451982558947eac1732e8bc45a
  • Loading branch information
riversand963 authored and facebook-github-bot committed Mar 29, 2019
1 parent dae3b55 commit d77476e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 75 deletions.
4 changes: 2 additions & 2 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ if test "$USE_HDFS"; then
echo "JAVA_HOME has to be set for HDFS usage."
exit 1
fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64"
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"
Expand Down
40 changes: 21 additions & 19 deletions env/env_hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#ifndef ROCKSDB_HDFS_FILE_C
#define ROCKSDB_HDFS_FILE_C

#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <algorithm>
#include <iostream>
#include <sstream>
#include "rocksdb/status.h"
#include "util/logging.h"
#include "util/string_util.h"

#define HDFS_EXISTS 0
Expand Down Expand Up @@ -224,7 +225,7 @@ class HdfsWritableFile: public WritableFile {
filename_.c_str());
const char* src = data.data();
size_t left = data.size();
size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
filename_.c_str());
if (ret != left) {
Expand Down Expand Up @@ -254,7 +255,8 @@ class HdfsWritableFile: public WritableFile {

// This is used by HdfsLogger to write data to the debug log file
virtual Status Append(const char* src, size_t size) {
if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
static_cast<tSize>(size)) {
return IOError(filename_, errno);
}
return Status::OK();
Expand Down Expand Up @@ -282,11 +284,10 @@ class HdfsLogger : public Logger {
Status HdfsCloseHelper() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
Status s = file_->Close();
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
return s;
return Status::OK();
}

protected:
Expand All @@ -299,14 +300,15 @@ class HdfsLogger : public Logger {
file_->getName().c_str());
}

virtual ~HdfsLogger() {
~HdfsLogger() override {
if (!closed_) {
closed_ = true;
HdfsCloseHelper();
}
}

virtual void Logv(const char* format, va_list ap) {
using Logger::Logv;
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();

// We try twice: the first time with a fixed-size stack allocated buffer,
Expand Down Expand Up @@ -384,7 +386,7 @@ const std::string HdfsEnv::pathsep = "/";
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
Expand All @@ -399,7 +401,7 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname,
// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
Expand All @@ -414,7 +416,7 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
Expand All @@ -432,7 +434,9 @@ class HdfsDirectory : public Directory {
explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() {}

virtual Status Fsync() { return Status::OK(); }
Status Fsync() override { return Status::OK(); }

int GetFd() const { return fd_; }

private:
int fd_;
Expand Down Expand Up @@ -477,10 +481,10 @@ Status HdfsEnv::GetChildren(const std::string& path,
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
char* pathname = pHdfsFileInfo[i].mName;
char* filename = std::rindex(pathname, '/');
if (filename != nullptr) {
result->push_back(filename+1);
std::string pathname(pHdfsFileInfo[i].mName);
size_t pos = pathname.rfind("/");
if (std::string::npos != pos) {
result->push_back(pathname.substr(pos + 1));
}
}
if (pHdfsFileInfo != nullptr) {
Expand Down Expand Up @@ -571,16 +575,14 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
return IOError(src, errno);
}

Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
// there isn's a very good way to atomically check and create
// a file via libhdfs
*lock = nullptr;
return Status::OK();
}

Status HdfsEnv::UnlockFile(FileLock* lock) {
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }

Status HdfsEnv::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {
Expand Down
99 changes: 48 additions & 51 deletions hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,110 +54,109 @@ class HdfsEnv : public Env {
hdfsDisconnect(fileSys_);
}

virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;

virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;

virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;

virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result);
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;

virtual Status FileExists(const std::string& fname);
Status FileExists(const std::string& fname) override;

virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result);
Status GetChildren(const std::string& path,
std::vector<std::string>* result) override;

virtual Status DeleteFile(const std::string& fname);
Status DeleteFile(const std::string& fname) override;

virtual Status CreateDir(const std::string& name);
Status CreateDir(const std::string& name) override;

virtual Status CreateDirIfMissing(const std::string& name);
Status CreateDirIfMissing(const std::string& name) override;

virtual Status DeleteDir(const std::string& name);
Status DeleteDir(const std::string& name) override;

virtual Status GetFileSize(const std::string& fname, uint64_t* size);
Status GetFileSize(const std::string& fname, uint64_t* size) override;

virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;

virtual Status RenameFile(const std::string& src, const std::string& target);
Status RenameFile(const std::string& src, const std::string& target) override;

virtual Status LinkFile(const std::string& src, const std::string& target) {
Status LinkFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported(); // not supported
}

virtual Status LockFile(const std::string& fname, FileLock** lock);
Status LockFile(const std::string& fname, FileLock** lock) override;

virtual Status UnlockFile(FileLock* lock);
Status UnlockFile(FileLock* lock) override;

virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;

virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr, void (*unschedFunction)(void* arg) = 0) {
void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {
posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
}

virtual int UnSchedule(void* tag, Priority pri) {
int UnSchedule(void* tag, Priority pri) override {
return posixEnv->UnSchedule(tag, pri);
}

virtual void StartThread(void (*function)(void* arg), void* arg) {
void StartThread(void (*function)(void* arg), void* arg) override {
posixEnv->StartThread(function, arg);
}

virtual void WaitForJoin() { posixEnv->WaitForJoin(); }
void WaitForJoin() override { posixEnv->WaitForJoin(); }

virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const
override {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return posixEnv->GetThreadPoolQueueLen(pri);
}

virtual Status GetTestDirectory(std::string* path) {
Status GetTestDirectory(std::string* path) override {
return posixEnv->GetTestDirectory(path);
}

virtual uint64_t NowMicros() {
return posixEnv->NowMicros();
}
uint64_t NowMicros() override { return posixEnv->NowMicros(); }

virtual void SleepForMicroseconds(int micros) {
void SleepForMicroseconds(int micros) override {
posixEnv->SleepForMicroseconds(micros);
}

virtual Status GetHostName(char* name, uint64_t len) {
Status GetHostName(char* name, uint64_t len) override {
return posixEnv->GetHostName(name, len);
}

virtual Status GetCurrentTime(int64_t* unix_time) {
Status GetCurrentTime(int64_t* unix_time) override {
return posixEnv->GetCurrentTime(unix_time);
}

virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
return posixEnv->GetAbsolutePath(db_path, output_path);
}

virtual void SetBackgroundThreads(int number, Priority pri = LOW) {
void SetBackgroundThreads(int number, Priority pri = LOW) override {
posixEnv->SetBackgroundThreads(number, pri);
}

virtual int GetBackgroundThreads(Priority pri = LOW) {
int GetBackgroundThreads(Priority pri = LOW) override {
return posixEnv->GetBackgroundThreads(pri);
}

virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
}

virtual std::string TimeToString(uint64_t number) {
std::string TimeToString(uint64_t number) override {
return posixEnv->TimeToString(number);
}

Expand All @@ -166,9 +165,7 @@ class HdfsEnv : public Env {
return (uint64_t)pthread_self();
}

virtual uint64_t GetThreadID() const override {
return HdfsEnv::gettid();
}
uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }

private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
Expand Down Expand Up @@ -206,7 +203,7 @@ class HdfsEnv : public Env {
std::string host(parts[0]);
std::string remaining(parts[1]);

int rem = remaining.find(pathsep);
int rem = static_cast<int>(remaining.find(pathsep));
std::string portStr = (rem == 0 ? remaining :
remaining.substr(0, rem));

Expand Down
4 changes: 2 additions & 2 deletions hdfs/setup.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# shellcheck disable=SC2148
export USE_HDFS=1
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$HADOOP_HOME/lib/native

export CLASSPATH=
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
9 changes: 8 additions & 1 deletion tools/db_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,14 @@ class StressTest {
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
}
}
DestroyDB(FLAGS_db, Options());
Options options;
options.env = FLAGS_env;
Status s = DestroyDB(FLAGS_db, options);
if (!s.ok()) {
fprintf(stderr, "Cannot destroy original db: %s\n",
s.ToString().c_str());
exit(1);
}
}
}

Expand Down

0 comments on commit d77476e

Please sign in to comment.