Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ matrix:
- package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package}

script:
- mkdir ./build && cd ./build && cmake .. && make && ./tests
- mkdir ./build && cd ./build && cmake .. && make && make test
15 changes: 15 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ enable_testing(test)
target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance")
target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp)
add_test(NAME LibMinifiTests COMMAND tests)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try and make this less duplicative?

file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp")
add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} ${SPD_SOURCES})
target_include_directories(testExecuteProcess PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
target_include_directories(testExecuteProcess PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
target_include_directories(testExecuteProcess PRIVATE BEFORE "include")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/repository")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/io")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/utils")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/processors")
target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/provenance")
target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp)
add_test(NAME ExecuteProcess COMMAND testExecuteProcess)

# Create a custom build target called "docker" that will invoke DockerBuild.sh and create the NiFi-MiNiFi-CPP Docker image
add_custom_target(
Expand Down
16 changes: 11 additions & 5 deletions libminifi/include/core/ConfigurableComponent.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
#define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_

#include <mutex>
#include <iostream>
#include <map>
#include <memory>
#include <set>

#include "logging/Logger.h"
#include "Property.h"
#include "core/logging/Logger.h"

namespace org {
namespace apache {
Expand All @@ -35,10 +41,8 @@ namespace core {
class ConfigurableComponent {
public:


ConfigurableComponent() = delete;


explicit ConfigurableComponent(std::shared_ptr<logging::Logger> logger);

explicit ConfigurableComponent(const ConfigurableComponent &&other);
Expand Down Expand Up @@ -81,18 +85,20 @@ class ConfigurableComponent {

protected:


/**
* Returns true if the instance can be edited.
* @return true/false
*/
virtual bool canEdit()= 0;

std::mutex configuration_mutex_;
std::shared_ptr<logging::Logger> logger_;

// Supported properties
std::map<std::string, Property> properties_;

private:
std::shared_ptr<logging::Logger> my_logger_;

};

} /* namespace core */
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/FlowConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
#include "processors/GetFile.h"
#include "processors/PutFile.h"
#include "processors/TailFile.h"
#include "processors/ListenHTTP.h"
#include "processors/ListenSyslog.h"
#include "processors/GenerateFlowFile.h"
#include "processors/RealTimeDataCollector.h"
#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
#include "processors/ExecuteProcess.h"
#include "processors/AppendHostInfo.h"
Expand Down
17 changes: 15 additions & 2 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ class ProcessSession {
// Create a new UUID FlowFile with no content resource claim and without parent
std::shared_ptr<core::FlowFile> create();
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent);
std::shared_ptr<core::FlowFile> create(
std::shared_ptr<core::FlowFile> &&parent);

std::shared_ptr<core::FlowFile> create(
std::shared_ptr<core::FlowFile> &parent){
return create(parent);
}
// Clone a new UUID FlowFile from parent both for content resource claim and attributes
std::shared_ptr<core::FlowFile> clone(
std::shared_ptr<core::FlowFile> &parent);
Expand Down Expand Up @@ -121,7 +127,14 @@ class ProcessSession {
// Penalize the flow
void penalize(std::shared_ptr<core::FlowFile> &flow);
void penalize(std::shared_ptr<core::FlowFile> &&flow);
// Import the existed file into the flow

/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*/
void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow);
// import from the data source.
void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
bool keepSource = true, uint64_t offset = 0);
void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
Expand Down
2 changes: 0 additions & 2 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ class Processor : public Connectable, public ConfigurableComponent,

// Check all incoming connections for work
bool isWorkAvailable();
// Logger
std::shared_ptr<logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Processor(const Processor &parent);
Expand Down
77 changes: 43 additions & 34 deletions libminifi/include/processors/GetFile.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/**
* @file GetFile.h
* GetFile class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -20,6 +18,7 @@
#ifndef __GET_FILE_H__
#define __GET_FILE_H__

#include <atomic>
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
Expand All @@ -31,28 +30,30 @@ namespace nifi {
namespace minifi {
namespace processors {

struct GetFileRequest{
std::string directory = ".";
bool recursive = true;
bool keepSourceFile = false;
int64_t minAge = 0;
int64_t maxAge = 0;
int64_t minSize = 0;
int64_t maxSize = 0;
bool ignoreHiddenFile = true;
int64_t pollInterval = 0;
int64_t batchSize = 10;
std::string fileFilter= "[^\\.].*";
};

// GetFile Class
class GetFile : public core::Processor {
public:
// Constructor
/*!
* Create a new processor
*/
GetFile(std::string name, uuid_t uuid = NULL)
explicit GetFile(std::string name, uuid_t uuid = NULL)
: Processor(name, uuid) {
logger_ = logging::Logger::getLogger();
_directory = ".";
_recursive = true;
_keepSourceFile = false;
_minAge = 0;
_maxAge = 0;
_minSize = 0;
_maxSize = 0;
_ignoreHiddenFile = true;
_pollInterval = 0;
_batchSize = 10;
_lastDirectoryListingTime = getTimeMillis();
_fileFilter = "[^\\.].*";

}
// Destructor
virtual ~GetFile() {
Expand All @@ -79,16 +80,28 @@ class GetFile : public core::Processor {
virtual void onTrigger(
core::ProcessContext *context,
core::ProcessSession *session);
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(
core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory);
// Initialize, over write by NiFi GetFile
virtual void initialize(void);
// perform directory listing
void performListing(std::string dir);
/**
* performs a listeing on the directory.
* @param dir directory to list
* @param request get file request.
*/
void performListing(std::string dir,const GetFileRequest &request);

protected:

private:
// Logger
std::shared_ptr<logging::Logger> logger_;

// Queue for store directory list
std::queue<std::string> _dirList;
// Get Listing size
Expand All @@ -101,23 +114,19 @@ class GetFile : public core::Processor {
// Put full path file name into directory listing
void putListing(std::string fileName);
// Poll directory listing for files
void pollListing(std::queue<std::string> &list, int maxSize);
void pollListing(std::queue<std::string> &list,const GetFileRequest &request);
// Check whether file can be added to the directory listing
bool acceptFile(std::string fullName, std::string name);
bool acceptFile(std::string fullName, std::string name, const GetFileRequest &request);
// Get file request object.
GetFileRequest request_;
// Mutex for protection of the directory listing

std::mutex mutex_;
std::string _directory;
bool _recursive;
bool _keepSourceFile;
int64_t _minAge;
int64_t _maxAge;
int64_t _minSize;
int64_t _maxSize;
bool _ignoreHiddenFile;
int64_t _pollInterval;
int64_t _batchSize;
uint64_t _lastDirectoryListingTime;
std::string _fileFilter;

// last listing time for root directory ( if recursive, we will consider the root
// as the top level time.
std::atomic<uint64_t> last_listing_time_;

};

} /* namespace processors */
Expand Down
22 changes: 16 additions & 6 deletions libminifi/include/processors/PutFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class PutFile : public core::Processor {
*/
PutFile(std::string name, uuid_t uuid = NULL)
: core::Processor(name, uuid) {
logger_ = logging::Logger::getLogger();
}
// Destructor
virtual ~PutFile() {
Expand All @@ -59,10 +58,18 @@ class PutFile : public core::Processor {
static core::Relationship Success;
static core::Relationship Failure;

/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory);

// OnTrigger method, implemented by NiFi PutFile
virtual void onTrigger(
core::ProcessContext *context,
core::ProcessSession *session);
virtual void onTrigger(core::ProcessContext *context,
core::ProcessSession *session);
// Initialize, over write by NiFi PutFile
virtual void initialize(void);

Expand All @@ -84,8 +91,11 @@ class PutFile : public core::Processor {
protected:

private:
// Logger
std::shared_ptr<logging::Logger> logger_;

// directory
std::string directory_;
// conflict resolution type.
std::string conflict_resolution_;

bool putFile(core::ProcessSession *session,
std::shared_ptr<FlowFileRecord> flowFile,
Expand Down
19 changes: 8 additions & 11 deletions libminifi/include/processors/TailFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class TailFile : public core::Processor {
/*!
* Create a new processor
*/
TailFile(std::string name, uuid_t uuid = NULL)
explicit TailFile(std::string name, uuid_t uuid = NULL)
: core::Processor(name, uuid) {
logger_ = logging::Logger::getLogger();
_stateRecovered = false;
}
// Destructor
Expand All @@ -57,9 +56,8 @@ class TailFile : public core::Processor {

public:
// OnTrigger method, implemented by NiFi TailFile
virtual void onTrigger(
core::ProcessContext *context,
core::ProcessSession *session);
virtual void onTrigger(core::ProcessContext *context,
core::ProcessSession *session);
// Initialize, over write by NiFi TailFile
virtual void initialize(void);
// recoverState
Expand All @@ -70,11 +68,7 @@ class TailFile : public core::Processor {
protected:

private:
// Logger
std::shared_ptr<logging::Logger> logger_;
std::string _fileLocation;
// Property Specified Tailed File Name
std::string _fileName;
std::mutex tail_file_mutex_;
// File to save state
std::string _stateFile;
// State related to the tailed file
Expand All @@ -86,7 +80,10 @@ class TailFile : public core::Processor {
std::string trimLeft(const std::string& s);
std::string trimRight(const std::string& s);
void parseStateFileLine(char *buf);
void checkRollOver();
/**
* Check roll over for the provided file.
*/
void checkRollOver(const std::string &, const std::string&);

};

Expand Down
Loading