Skip to content

Commit

Permalink
Reworked S3 GetObject by moving it to a helper application
Browse files Browse the repository at this point in the history
Added a notes section to Contributing.md

Added the s3iomain.cpp and entries to the CMakefile to compile the
GetObject helper binary.

Starter is not a Singleton anymore.

Added IOHelper::getApplicationPath() to get the absolute path of the
running application. This is used to run the helper binary.

Added the S3GetObjectProcessWrapper, which is used to run the helper
binary in a separate process and thread.

Moved the async GetObject code from S3Source to the helper binary and
made it a bit more readable.

Added unsigned (U) suffixes to some numbers in tests.
  • Loading branch information
heinold committed Aug 12, 2019
1 parent 4f21d94 commit 1c078fd
Show file tree
Hide file tree
Showing 22 changed files with 435 additions and 100 deletions.
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ If you are adding a new file it should have a header like this:
*/
```

## Notes before you start

- FastqIndEx is currently limited to run under Linux. E.g. we access
the /proc folder which might not be available on your operating system.

## FASTQ Index (FQI) Format, Version 1

FQI files are binary files with the following general structure:
Expand Down
14 changes: 13 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ cmake_minimum_required(VERSION 3.13)
set(CMAKE_CXX_STANDARD 17)
#add_definitions(-DUSE_IMPORT_EXPORT)
#find_package(AWSSDK REQUIRED COMPONENTS s3)

# Main application
add_executable(fastqindex main.cpp main.h)

# Helper application for S3
# Refer to s3iomain for more info.
add_executable(fastqindexs3iohelper s3iomain.cpp)

include_directories(
${CMAKE_SOURCE_DIR}/src
)
Expand Down Expand Up @@ -42,6 +48,7 @@ add_library(
process/io/s3/FQIS3Client.h
process/io/s3/S3ServiceOptions.h
process/io/s3/S3Config.cpp process/io/s3/S3Config.h
process/io/s3/S3GetObjectProcessWrapper.cpp process/io/s3/S3GetObjectProcessWrapper.h
process/io/s3/S3Service.cpp process/io/s3/S3Service.h
process/io/s3/S3Sink.h
process/io/s3/S3Source.cpp process/io/s3/S3Source.h
Expand Down Expand Up @@ -83,4 +90,9 @@ target_link_libraries(
fastqindexlib
)

set_target_properties(fastqindex PROPERTIES COMPILE_FLAGS "-Wreturn-type -pedantic -ansi -Winit-self -Wextra -Winit-self -Wold-style-cast -Woverloaded-virtual -Wuninitialized")
target_link_libraries(
fastqindexs3iohelper
fastqindexlib
)

set_target_properties(fastqindex PROPERTIES COMPILE_FLAGS "-Wreturn-type -pedantic -ansi -Winit-self -Wextra -Wold-style-cast -Woverloaded-virtual -Wuninitialized")
3 changes: 3 additions & 0 deletions src/common/CommonStructsAndConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

using namespace std;

char FQI_BINARY[16384]{0};
char S3HELPER_BINARY[16384]{0};

const u_char MAGIC_NUMBER_RAW[4] = {1, 2, 3, 4};

/**
Expand Down
7 changes: 7 additions & 0 deletions src/common/CommonStructsAndConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
#include <string>
#include <zconf.h>

/**
* In any case, this value would need to be const. But as we do not have the value when the variable is created, we
* will just keep it here. At the end, it only stores the path to the test binary and is used for tests only.
*/
extern char FQI_BINARY[16384];
extern char S3HELPER_BINARY[16384];

/**
* Used to identify a file as a file created by this binary.
*/
Expand Down
18 changes: 9 additions & 9 deletions src/common/ErrorAccumulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ void ErrorAccumulator::setVerbosity(int verbosity) {

bool ErrorAccumulator::verbosityIsSetToDebug() { return verbosity >= 3; }

void ErrorAccumulator::always(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5) {
cerr << ErrorAccumulator::join(s0, s1, s2, s3, s4, s5) << "\n";
void ErrorAccumulator::always(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5, _cstr s6) {
cerr << ErrorAccumulator::join(s0, s1, s2, s3, s4, s5, s6) << "\n";
}

void ErrorAccumulator::debug(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5) {
if (verbosityIsSetToDebug()) cerr << ErrorAccumulator::join(s0, s1, s2, s3, s4, s5) << "\n";
void ErrorAccumulator::debug(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5, _cstr s6) {
if (verbosityIsSetToDebug()) cerr << ErrorAccumulator::join(s0, s1, s2, s3, s4, s5, s6) << "\n";
}

void ErrorAccumulator::info(const string &msg) {
Expand All @@ -45,14 +45,14 @@ void ErrorAccumulator::severe(const string &msg) {

vector<string> ErrorAccumulator::getErrorMessages() { return errorMessages; }

void ErrorAccumulator::addErrorMessage(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5) {
ErrorAccumulator::debug(s0, s1, s2, s3, s4, s5);
errorMessages.emplace_back(join(s0, s1, s2, s3, s4, s5));
void ErrorAccumulator::addErrorMessage(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5, _cstr s6) {
ErrorAccumulator::debug(s0, s1, s2, s3, s4, s5, s6);
errorMessages.emplace_back(join(s0, s1, s2, s3, s4, s5, s6));
}

string ErrorAccumulator::join(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5) {
string ErrorAccumulator::join(_cstr s0, _cstr s1, _cstr s2, _cstr s3, _cstr s4, _cstr s5, _cstr s6) {
ostringstream stream(s0);
stream << s0 << s1 << s2 << s3 << s4 << s5;
stream << s0 << s1 << s2 << s3 << s4 << s5 << s6;
return stream.str();
}

Expand Down
8 changes: 4 additions & 4 deletions src/common/ErrorAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class ErrorAccumulator {

static bool verbosityIsSetToDebug();

static void always(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "");
static void always(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "", _cstr s6 = "");

static void debug(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "");
static void debug(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "", _cstr s6 = "");

static void info(const string &msg);

Expand All @@ -64,9 +64,9 @@ class ErrorAccumulator {
* This would actually be a perfect example for a variadic function but handling variadic functions is tricky as
* the 'va_...()' macros don't know about the number of passed arguments.
*/
void addErrorMessage(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "");
void addErrorMessage(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "", _cstr s6 = "");

static string join(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "");
static string join(_cstr s0, _cstr s1 = "", _cstr s2 = "", _cstr s3 = "", _cstr s4 = "", _cstr s5 = "", _cstr s6 = "");

/**
* This method can be used, if two vectors should be merged. Note, that we always copy the content of the two source
Expand Down
25 changes: 23 additions & 2 deletions src/common/IOHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,41 @@ tuple<bool, path> IOHelper::createTempFifo(const string &prefix) {
return {success, fifoPath};
}

/**
* Careful, this method does not necessarily work! Tried it with a binary file path and it got me some path in my home
* directory. The binary was somewhere else!.
*/
path IOHelper::fullPath(const path &file) {
char buf[32768]{0};
// readlink(file.string().c_str(), buf, 32768); // Not working! Returns \0
realpath(file.string().c_str(), buf);
return path(string(buf));
}

shared_ptr<map<string, string>> IOHelper::loadIniFile(const path &file, const string& section) {
/**
* Retrieve the absolute path of the current executable. This will only work with operating systems which use a /proc
* or /user folder to hold process information.
*/
path IOHelper::getApplicationPath() {
char buf[32768]{0};
if (exists("/proc"))
readlink("/proc/self/exe", buf, 32768);
else if (exists("/user"))
readlink("/user/self/exe", buf, 32768);
else
ErrorAccumulator::severe(string("BUG: Could not the folders '/proc' or '/user'. ") +
"Are you running FastqIndEx on a compatible system?");
return path(string(buf));
}

shared_ptr<map<string, string>> IOHelper::loadIniFile(const path &file, const string &section) {
auto resultMap = make_shared<map<string, string>>();
CSimpleIniA configuration;
configuration.SetUnicode(true);
configuration.LoadFile(file.string().c_str());
CSimpleIniA::TNamesDepend keys;
configuration.GetAllKeys(section.c_str(), keys);
for (const auto& key : keys) {
for (const auto &key : keys) {
auto val = configuration.GetValue(section.c_str(), key.pItem);
(*resultMap)[string(key.pItem)] = string(val);
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/IOHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class IOHelper {
*/
static path fullPath(const path &file);

static path getApplicationPath();

static shared_ptr<map<string, string>> loadIniFile(const path &file, const string &section);
};

Expand Down
16 changes: 11 additions & 5 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@
#include "main.h"
#include "startup/Starter.h"


int main(int argc, const char *argv[]) {
Starter *starter = Starter::getInstance();
shared_ptr<Runner> runner = starter->createRunner(argc, argv);
// Store the application binary path.
path fqiBinaryPath = IOHelper::getApplicationPath();
path s3HelperBinary = IOHelper::fullPath(fqiBinaryPath.parent_path().string() + "/fastqindexs3iohelper");
strcpy(FQI_BINARY, fqiBinaryPath.string().c_str());
strcpy(S3HELPER_BINARY, s3HelperBinary.string().c_str());
ErrorAccumulator::always("FQI binary path: '", fqiBinaryPath, "'");
ErrorAccumulator::always("S3 helper binary path: '", s3HelperBinary, "'");

Starter starter;
auto runner = starter.createRunner(argc, argv);

int exitCode = 0;
if (!runner->fulfillsPremises()) {
Expand All @@ -22,8 +31,5 @@ int main(int argc, const char *argv[]) {
exitCode = runner->run();
}

S3Service::closeIfOpened();

delete starter;
return exitCode;
}
1 change: 1 addition & 0 deletions src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using namespace std;


#ifndef __MAIN_H__
#define __MAIN_H__

Expand Down
11 changes: 9 additions & 2 deletions src/process/io/s3/FQIS3Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ class FQIS3Client : ErrorAccumulator {

string object_name;

public:
S3ServiceOptions serviceOptions;

public:

FQIS3Client(const string &s3Path,
const S3ServiceOptions &s3ServiceOptions) {
this->s3Path = s3Path;
this->s3Config = S3Service::getInstance()->getConfig();
this->serviceOptions = s3ServiceOptions;
auto split = StringHelper::splitStr(s3Path, '/'); // s3://bucket/object

if (split.size() != 4) {
Expand Down Expand Up @@ -101,6 +103,11 @@ class FQIS3Client : ErrorAccumulator {
return object_name;
}

S3ServiceOptions getS3ServiceOptions() {
return serviceOptions;
}


template<typename T>
bool performS3Request(function<T(S3Client &client)> s3Request) {
bool result;
Expand Down Expand Up @@ -165,7 +172,7 @@ class FQIS3Client : ErrorAccumulator {
if (!success)
return {false, 0};

for (const auto& entry : list) {
for (const auto &entry : list) {
if (entry.name == object_name) {
found = true;
size = entry.size;
Expand Down
39 changes: 39 additions & 0 deletions src/process/io/s3/S3GetObjectProcessWrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright (c) 2019 DKFZ - ODCF
*
* Distributed under the MIT License (license terms are at https://github.com/dkfz-odcf/FastqIndEx/blob/master/LICENSE.txt).
*/

#include "common/CommonStructsAndConstants.h"
#include "S3GetObjectProcessWrapper.h"

void S3GetObjectProcessWrapper::processThreadFunc(S3GetObjectProcessWrapper *wrapper) {
stringstream cmd;
// We need to pass double ticks with a whitespace in it (" ") if you have an empty argument.
// "" and '' were not regarded as parameters in our tests.
cmd << "\"" << S3HELPER_BINARY << "\" \""
<< wrapper->fifo.string() << "\" \""
<< wrapper->s3Object << "\" \""
<< wrapper->serviceOptions.configSection << "\" \""
<< wrapper->serviceOptions.configFile.string() << " \" \""
<< wrapper->serviceOptions.credentialsFile.string() << " \" \""
<< wrapper->readStart << "\""; // Single ticks will be regarded as two parameters with a space inside...
ErrorAccumulator::debug("Async call: ", cmd.str());
int result = system(cmd.str().c_str());
ErrorAccumulator::debug("Async call to S3 started with result of: ", to_string(result));
}

void S3GetObjectProcessWrapper::start() {
lock_guard<mutex> lock(mtx);
if (!processThread) {
processThread = make_shared<thread>(processThreadFunc, this);
}
}

void S3GetObjectProcessWrapper::waitForFinish() {
lock_guard<mutex> lock(mtx); // Prevent a new process from being started
if (processThread) {
processThread->join();
processThread.reset();
}
}
89 changes: 89 additions & 0 deletions src/process/io/s3/S3GetObjectProcessWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright (c) 2019 DKFZ - ODCF
*
* Distributed under the MIT License (license terms are at https://github.com/dkfz-odcf/FastqIndEx/blob/master/LICENSE.txt).
*/

#ifndef FASTQINDEX_S3GETOBJECTPROCESSWRAPPER_H
#define FASTQINDEX_S3GETOBJECTPROCESSWRAPPER_H

#include "common/ErrorAccumulator.h"
#include "S3ServiceOptions.h"
#include <thread>
#include <mutex>

using namespace std;

/**
* The class is a wrapper for the tool binary "fastqindexs3iohelper".
* The whole purpose of this is to get a better user experience by swallowing error messages like:
* - SIGABRT
* - SIGSEV
* - SIGPIP
* all coming up because I did not find (and I searched for it!) a good way to abort a get object request for S3.
*/
class S3GetObjectProcessWrapper : public ErrorAccumulator {
private:

/**
* The options for S3.
*/
S3ServiceOptions serviceOptions;

/**
* The fifo (or file) to write to.
*/
path fifo;

/**
* The object which shall be downloaded.
*/
string s3Object;

/**
* Where to start reading.
*/
int64_t readStart;

/**
* Mutex to perform thread-safe operations in this class
*/
mutex mtx;

/**
* The Thread which is used to run the child S3 process.
*/
shared_ptr<thread> processThread;

static void processThreadFunc(S3GetObjectProcessWrapper *wrapper);

public:

explicit S3GetObjectProcessWrapper(const S3ServiceOptions &serviceOptions,
const path &fifo,
const string &s3Object,
int64_t readStart) {
this->serviceOptions = serviceOptions;
this->fifo = fifo;
this->s3Object = s3Object;
this->readStart = readStart;
}

virtual ~S3GetObjectProcessWrapper() {
waitForFinish();
ErrorAccumulator::debug("Abort Thread!");
}

/**
* Start a download process, if no process is already active. Will not stop an active process.
*/
void start();

/**
* Will call join on the underlying thread and thus wait until the download process ends.
* To really end
*/
void waitForFinish();
};

#endif //FASTQINDEX_S3GETOBJECTPROCESSWRAPPER_H

0 comments on commit 1c078fd

Please sign in to comment.