Skip to content

Commit

Permalink
Implement ftp file listener in Ballerina
Browse files Browse the repository at this point in the history
Fixes wso2#27
  • Loading branch information
Nipunich committed Jun 24, 2019
1 parent cdc24e3 commit f3b12c3
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 136 deletions.
106 changes: 106 additions & 0 deletions examples/guides/file-integration/ftp-listener/ftp_listener.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import wso2/ftp;
import ballerina/log;
import ballerina/io;
import ballerina/internal;
import ballerina/config;
import ballerina/http;
import wso2ftputils;

string fileNamePattern = ".*.json";
string destFolder = "/movedFolder";

// Creating a ftp listener instance by defining the configuration.
listener ftp:Listener remoteServer = new({
protocol: ftp:FTP,
host: config:getAsString("FTP_HOST"),
port: config:getAsInt("FTP_LISTENER_PORT"),
pollingInterval:config:getAsInt("FTP_POLLING_INTERVAL"),
fileNamePattern:fileNamePattern,
secureSocket: {
basicAuth: {
username: config:getAsString("FTP_USERNAME"),
password: config:getAsString("FTP_PASSWORD")
}
},
path: "/newFolder"
});

// Defining the configuration of the ftp client endpoint.
ftp:ClientEndpointConfig ftpConfig = {
protocol: ftp:FTP,
host: config:getAsString("FTP_HOST"),
port: config:getAsInt("FTP_LISTENER_PORT"),
secureSocket: {
basicAuth: {
username: config:getAsString("FTP_USERNAME"),
password: config:getAsString("FTP_PASSWORD")
}
}
};

ftp:Client ftpClient = new(ftpConfig);

service monitor on remoteServer {
resource function fileResource(ftp:WatchEvent m) {
foreach ftp:FileInfo v1 in m.addedFiles {
log:printInfo("Added file path: " + v1.path);

processFile(untaint v1.path);

string destFilePath = createDestPath(v1);

// Moving the file to another location on the same ftp server after processing.
error? renameErr = ftpClient->rename(v1.path, destFilePath);
}

foreach string v2 in m.deletedFiles {
log:printInfo("Deleted file path: " + v2);
}
}
}

// Processing logic that needs to be done on the file content based on the file type.
public function processFile(string sourcePath) {

var getResult = ftpClient->get(sourcePath);

if(getResult is io:ReadableByteChannel){
json|error jsonFileRes = wso2ftputils:readJsonFile(getResult);

if(jsonFileRes is json) {
log:printInfo("File read successfully");
}else {
log:printError("Error in reading file", err = jsonFileRes);
}
} else {
log:printError("Error in reading file");
}
}


// Generating the file name of the processed file.
public function createDestPath(ftp:FileInfo v2) returns string {
int subString = v2.path.lastIndexOf("/");
int length = v2.path.length();
string subPath = v2.path.substring((subString + 1), length);
string destPath = destFolder + "/" + subPath;

return destPath;
}

136 changes: 0 additions & 136 deletions examples/guides/file-integration/ftp_listener.bal

This file was deleted.

31 changes: 31 additions & 0 deletions examples/guides/file-integration/wso2ftputils/ftp_utils.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import ballerina/log;
import ballerina/io;
import ballerina/config;
import ballerina/http;

public function readJsonFile(io:ReadableByteChannel result) returns json|error {

io:ReadableCharacterChannel? charChannelResult = getCharChannel(result);
var resultJson = charChannelResult.readJson();

if (resultJson is json) {
io:println("File content: ", resultJson);
return resultJson;
} else {
log:printError("An error occured.", err = resultJson);
return resultJson;
}
}

public function getCharChannel(io:ReadableByteChannel getResult) returns io:ReadableCharacterChannel? {

io:ReadableCharacterChannel? charChannel = new io:ReadableCharacterChannel(getResult, "utf-8");

if (charChannel is io:ReadableCharacterChannel) {
return charChannel;
} else {
log:printError("An error occured.");
return;
}
}

0 comments on commit f3b12c3

Please sign in to comment.