Skip to content

Commit

Permalink
Archive pull files after processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Oct 29, 2015
1 parent aca9e8c commit bfa56a6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
53 changes: 44 additions & 9 deletions symmetric-client-clib/src/transport/file/FileIncomingTransport.c
Expand Up @@ -49,15 +49,16 @@ char* SymFileIncomingTransport_getIncomingFile(SymFileIncomingTransport *this, c
firstFile = files->get(files, 0);
}

// seems we have stack memory here for firstFile.
// This memory gets overwritten after it's returned.
if (firstFile) {
firstFile = SymStringUtils_format("%s", firstFile);
}

free(startFilter);
files->destroy(files);

if (firstFile) {
char *path = SymStringUtils_format("%s/%s", this->offlineIncomingDir, firstFile);
return path;
} else {
return firstFile;
}
return firstFile;
}

long SymFileIncomingTransport_process(SymFileIncomingTransport *this, SymDataProcessor *processor) {
Expand All @@ -66,22 +67,56 @@ long SymFileIncomingTransport_process(SymFileIncomingTransport *this, SymDataPro
char inputBuffer[BUFFER_SIZE];

char *fileName = SymFileIncomingTransport_getIncomingFile(this, ".csv");
if (!fileName || SymStringUtils_isBlank(fileName)) {
SymLog_info("No incoming files found at '%s'", this->offlineIncomingDir);
return SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE;
}
char *path = SymStringUtils_format("%s/%s", this->offlineIncomingDir, fileName);

file = fopen(fileName,"r");
file = fopen(path,"r");

if (!file) {
SymLog_warn("Failed to load file %s", this->offlineIncomingDir);
SymLog_warn("Failed to load file '%s'", path);
return SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE;
}

processor->open(processor);

unsigned short success = 1;

int count;
while ((count = fread(inputBuffer, sizeof(char), BUFFER_SIZE, file)) > 0) {
processor->process(processor, inputBuffer, sizeof(char), count);
int size = processor->process(processor, inputBuffer, sizeof(char), count);
if (size == 0) {
SymLog_warn("Failed to process file %s", this->offlineIncomingDir);
success = 0;
break;
}
}

processor->close(processor);
fclose(file);

if (success) {
if (SymStringUtils_isNotBlank(this->offlineArchiveDir)) {
char *archivePath = SymStringUtils_format("%s/%s", this->offlineArchiveDir, fileName);
int result = rename(path, archivePath);
if (result) {
SymLog_warn("Failed to archive '%s' to '%s'", path, archivePath);
}
} else {
int result = remove(path);
if (result) {
SymLog_warn("Failed to delete '%s'", path);
}
}
} else if (SymStringUtils_isNotBlank(this->offlineErrorDir)) {
char *errorPath = SymStringUtils_format("%s/%s", this->offlineErrorDir, fileName);
int result = rename(path, errorPath);
if (result) {
SymLog_warn("Failed to archive '%s' to '%s'", path, errorPath);
}
}

return SYM_TRANSPORT_OK;
}
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-native/inc/SymClientNative.h
Expand Up @@ -29,6 +29,7 @@
#include "util/StringArray.h"
#include "util/StringUtils.h"
#include "util/Properties.h"
#include "common/Log.h"

int SymNativeClient_runSymmetricEngine(SymProperties *properties);

Expand Down

0 comments on commit bfa56a6

Please sign in to comment.