Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libminifi/include/FlowController.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "RemoteProcessorGroupPort.h"
#include "Provenance.h"
#include "GetFile.h"
#include "PutFile.h"
#include "TailFile.h"
#include "ListenSyslog.h"
#include "ExecuteProcess.h"
Expand Down
88 changes: 88 additions & 0 deletions libminifi/include/PutFile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* @file PutFile.h
* PutFile class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __PUT_FILE_H__
#define __PUT_FILE_H__

#include "FlowFileRecord.h"
#include "Processor.h"
#include "ProcessSession.h"

//! PutFile Class
class PutFile : public Processor
{
public:

static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;

//! Constructor
/*!
* Create a new processor
*/
PutFile(std::string name, uuid_t uuid = NULL)
: Processor(name, uuid)
{
_logger = Logger::getLogger();
}
//! Destructor
virtual ~PutFile()
{
}
//! Processor Name
static const std::string ProcessorName;
//! Supported Properties
static Property Directory;
static Property ConflictResolution;
//! Supported Relationships
static Relationship Success;
static Relationship Failure;

//! OnTrigger method, implemented by NiFi PutFile
virtual void onTrigger(ProcessContext *context, ProcessSession *session);
//! Initialize, over write by NiFi PutFile
virtual void initialize(void);

class ReadCallback : public InputStreamCallback
{
public:
ReadCallback(const std::string &tmpFile, const std::string &destFile);
~ReadCallback();
virtual void process(std::ifstream *stream);
bool commit();

private:
Logger *_logger;
std::ofstream _tmpFileOs;
bool _writeSucceeded = false;
std::string _tmpFile;
std::string _destFile;
};

protected:

private:
//! Logger
Logger *_logger;

bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile);
};

#endif
4 changes: 4 additions & 0 deletions libminifi/src/FlowController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
processor = new GetFile(name, uuid);
}
else if (name == PutFile::ProcessorName)
{
processor = new PutFile(name, uuid);
}
else if (name == TailFile::ProcessorName)
{
processor = new TailFile(name, uuid);
Expand Down
200 changes: 200 additions & 0 deletions libminifi/src/PutFile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* @file PutFile.cpp
* PutFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>
#include <fstream>
#include <uuid/uuid.h>

#include "TimeUtil.h"
#include "PutFile.h"
#include "ProcessContext.h"
#include "ProcessSession.h"

const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");

const std::string PutFile::ProcessorName("PutFile");

Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);

Relationship PutFile::Success("success", "All files are routed to success");
Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");

void PutFile::initialize()
{
//! Set the supported properties
std::set<Property> properties;
properties.insert(Directory);
properties.insert(ConflictResolution);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(relationships);
}

void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
{
std::string directory;

if (!context->getProperty(Directory.getName(), directory))
{
_logger->log_error("Directory attribute is missing or invalid");
return;
}

std::string conflictResolution;

if (!context->getProperty(ConflictResolution.getName(), conflictResolution))
{
_logger->log_error("Conflict Resolution Strategy attribute is missing or invalid");
return;
}

FlowFileRecord *flowFile = session->get();

// Do nothing if there are no incoming files
if (!flowFile)
{
return;
}

std::string filename;
flowFile->getAttribute(FILENAME, filename);

// Generate a safe (universally-unique) temporary filename on the same partition
char tmpFileUuidStr[37];
uuid_t tmpFileUuid;
uuid_generate(tmpFileUuid);
uuid_unparse(tmpFileUuid, tmpFileUuidStr);
std::stringstream tmpFileSs;
tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
std::string tmpFile = tmpFileSs.str();
_logger->log_info("PutFile using temporary file %s", tmpFile.c_str());

// Determine dest full file paths
std::stringstream destFileSs;
destFileSs << directory << "/" << filename;
std::string destFile = destFileSs.str();

_logger->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str());

// If file exists, apply conflict resolution strategy
struct stat statResult;

if (stat(destFile.c_str(), &statResult) == 0)
{
_logger->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflictResolution.c_str());

if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
{
putFile(session, flowFile, tmpFile, destFile);
}
else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE)
{
session->transfer(flowFile, Success);
}
else
{
session->transfer(flowFile, Failure);
}
}
else
{
putFile(session, flowFile, tmpFile, destFile);
}
}

bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile)
{

ReadCallback cb(tmpFile, destFile);
session->read(flowFile, &cb);

if (cb.commit())
{
session->transfer(flowFile, Success);
}
else
{
session->transfer(flowFile, Failure);
}
}

PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
: _tmpFile(tmpFile)
, _tmpFileOs(tmpFile)
, _destFile(destFile)
{
_logger = Logger::getLogger();
}

// Copy the entire file contents to the temporary file
void PutFile::ReadCallback::process(std::ifstream *stream)
{
// Copy file contents into tmp file
_writeSucceeded = false;
_tmpFileOs << stream->rdbuf();
_writeSucceeded = true;
}

// Renames tmp file to final destination
// Returns true if commit succeeded
bool PutFile::ReadCallback::commit()
{
bool success = false;

_logger->log_info("PutFile committing put file operation to %s", _destFile.c_str());

if (_writeSucceeded)
{
_tmpFileOs.close();

if (rename(_tmpFile.c_str(), _destFile.c_str()))
{
_logger->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
}
else
{
success = true;
_logger->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
}
}
else
{
_logger->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
}

return success;
}

// Clean up resources
PutFile::ReadCallback::~ReadCallback() {
// Close tmp file
_tmpFileOs.close();

// Clean up tmp file, if necessary
unlink(_tmpFile.c_str());
}