diff --git a/docs/SCPRemote-action.md b/docs/SCPRemote-action.md index ea59456..65ec913 100644 --- a/docs/SCPRemote-action.md +++ b/docs/SCPRemote-action.md @@ -32,4 +32,4 @@ Properties Usage Notes -------- -In order to perform SCP between to remote hosts, we require a Bato. An SCP command based on the configuration supplied will be created to perform a compressed file copy. Authentication setup between all hosts will need to be setup before hand. This includes being able to SSH on the bastion host with the private key being supplied in the configuration and have the 2 remote hosts that files are being moved on having known host/authenticated_keys setup for SSH communication between them.stion host that we will ssh in \ No newline at end of file +In order to perform SCP between to remote hosts, we require a Bastion Host. An SCP command based on the configuration supplied will be created to perform a file copy. Authentication setup between all hosts will need to be setup before hand. This includes being able to SSH on the bastion host with the private key being supplied in the configuration and have the 2 remote hosts that files are being moved on having known host/authenticated_keys setup for SSH communication. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5002449..6667946 100644 --- a/pom.xml +++ b/pom.xml @@ -20,36 +20,31 @@ io.cdap.plugin sftp-actions-ssh jar - 1.5.1-SNAPSHOT + 1.5.0-SNAPSHOT SFTP Actions UTF-8 - 6.1.2 - 2.3.5 + 6.2.0 + 2.4.0 0.1.53 2.3.0 19.0 - widgets docs - [6.0.0-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-pipeline, system:cdap-data-streams - ${project.basedir} - http://maven.apache.org - io.cdap.cdap @@ -80,8 +75,6 @@ ${cdap.version} test - - com.jcraft jsch @@ -104,16 +97,13 @@ 4.12 test - - - software.sham sham-ssh 0.1.0 + test - org.mockito @@ -122,7 +112,6 @@ test - diff --git a/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java index 29d0c09..13f8ae5 100644 --- a/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java +++ b/src/main/java/io/cdap/plugin/SCPRemotetoRemoteAction.java @@ -1,7 +1,5 @@ -package io.cdap.plugin; - /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,136 +14,121 @@ * the License. */ -import com.google.common.annotations.VisibleForTesting; +package io.cdap.plugin; +import com.google.common.annotations.VisibleForTesting; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; - import com.jcraft.jsch.Session; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; - import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; - +import java.nio.charset.StandardCharsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; - @Plugin(type = Action.PLUGIN_TYPE) @Name(SCPRemotetoRemoteAction.PLUGIN_NAME) @Description("This action will connect to a Bastion Host and execute a SCP command to copy a file from Host A to Host B.") public class SCPRemotetoRemoteAction extends Action { - public static final String PLUGIN_NAME = "SCPRemote"; private static final Logger LOG = LoggerFactory.getLogger(SCPRemotetoRemoteAction.class); - private final SCPRemotetoRemoteActionConfig config; @VisibleForTesting - public SCPRemotetoRemoteAction(SCPRemotetoRemoteActionConfig config) { - this.config = config; - } + public SCPRemotetoRemoteAction(SCPRemotetoRemoteActionConfig config) { this.config = config; } - /** - * This function is executed by the Pipelines framework when the Pipeline is deployed. This - * is a good place to validate any configuration options the user has entered. If this throws - * an exception, the Pipeline will not be deployed and the user will be shown the error message. - */ @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { super.configurePipeline(pipelineConfigurer); - LOG.debug(String.format("Running the 'configurePipeline' method of the %s plugin.", PLUGIN_NAME)); + LOG.debug("Executing the 'run' method of the {} plugin", PLUGIN_NAME); config.validate(); } @Override public void run(ActionContext context) throws JSchException, IOException { - LOG.debug(String.format("Running the 'run' method of the %s plugin.", PLUGIN_NAME)); + LOG.debug("Running the 'run' method of the {} plugin.", PLUGIN_NAME); config.validate(); - byte[] key = config.getPrivateKey(); byte[] passphrase = config.getPassphrase(); JSch jsch = new JSch(); - jsch.addIdentity("key", key, null,passphrase); - Session session = jsch.getSession(config.getUserNameBastion(),config.getHostBastion(),config.getPort()); - + jsch.addIdentity("key", key, null, passphrase); + Session session = jsch.getSession(config.getUserNameBastion(), + config.getHostBastion(), config.getPort()); session.setConfig("StrictHostKeyChecking", "no"); session.connect(); ChannelExec channel = (ChannelExec) session.openChannel("exec"); - String userA = config.getUserNameA(); String hostA = config.getHostA(); String source = config.getSource(); - String pathSource = userA+"@"+hostA+":"+source; - + String pathSource = userA + "@" + hostA + ":" + source; String userB = config.getUserNameB(); String hostB = config.getHostB(); - String dest= config.getDest(); - String pathDest = userB+"@"+hostB+":"+dest; - + String dest = config.getDest(); + String pathDest = userB + "@" + hostB + ":" + dest; String dirFlag = config.getDirFlag(); + if (dirFlag.equals("directory-off")){ + dirFlag = ""; + } else { + dirFlag = "-r"; + } String compressFlag = config.getCompressFlag(); + if (compressFlag.equals("compression-off")){ + compressFlag = ""; + } else { + compressFlag = "-C"; + } String verboseFlag = config.getVerboseFlag(); - + if (verboseFlag.equals("verbose-off")){ + verboseFlag = ""; + } else { + verboseFlag = "-v"; + } //Host A -> Host B - channel.setCommand("scp "+compressFlag+ " "+verboseFlag+ - " " +dirFlag +" " +pathSource +" " +pathDest); + channel.setCommand("scp " + compressFlag + " " + verboseFlag + + " " + dirFlag + " " + pathSource + " " + pathDest); + channel.connect(); + verboseLogging(channel); + channel.disconnect(); + session.disconnect(); + } + private void verboseLogging(ChannelExec channel) throws IOException { + StringBuilder inputBuffer = new StringBuilder(); StringBuilder outputBuffer = new StringBuilder(); - StringBuilder errorBuffer = new StringBuilder(); - InputStream in = channel.getInputStream(); - InputStream err = channel.getExtInputStream(); - - channel.connect(); - + InputStream out = channel.getExtInputStream(); byte[] tmp = new byte[1024]; - while (true) { - while (in.available() > 0) { - int i = in.read(tmp, 0, 1024); - if (i < 0) break; - outputBuffer.append(new String(tmp, 0, i)); - } - while (err.available() > 0) { - int i = err.read(tmp, 0, 1024); - if (i < 0) break; - errorBuffer.append(new String(tmp, 0, i)); - } - if (channel.isClosed()) { - if ((in.available() > 0) || (err.available() > 0)) continue; - System.out.println("exit-status: " + channel.getExitStatus()); - break; - } - try { - Thread.sleep(1000); - } catch (Exception ignored) { - } + int lenIn = in.read(tmp, 0, tmp.length); + while (lenIn > 0){ + inputBuffer.append(new String(tmp, 0, lenIn, StandardCharsets.UTF_8)); + lenIn = in.read(tmp, 0, tmp.length); } - - LOG.debug("output: " + outputBuffer.toString()); - LOG.info("info: " + errorBuffer.toString()); - - channel.disconnect(); - session.disconnect(); - + int lenOut = out.read(tmp, 0, tmp.length); + while (lenOut > 0){ + outputBuffer.append(new String(tmp, 0, lenOut, StandardCharsets.UTF_8)); + lenOut = out.read(tmp, 0, tmp.length); + } + if (channel.isClosed()) { + LOG.info("Exit-Status: " + channel.getExitStatus()); + } + LOG.info("Input: " + inputBuffer.toString()); + LOG.info("Verbose Info: " + outputBuffer.toString()); } - /** * The config class for {@link SCPRemotetoRemoteAction} that contains all properties that need to be filled in by * the user when building a Pipeline. */ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { - @Description("Hostname or IP Address of the SSH server.") @Macro public String hostBastion; @@ -168,7 +151,7 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Nullable public String passphrase; - @Description("Name of the user used to login to SSH server.") + @Description("Name of the user used to login to SSH server that contains the files to copy.") @Macro public String userNameA; @@ -176,11 +159,11 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Macro public String hostA; - @Description("Absolute path on Host A") + @Description("Absolute path on Host A to copy") @Macro public String sourcePath; - @Description("Name of the user used to login to SSH server.") + @Description("Name of the user used to login to SSH server that files should be copied to.") @Macro public String userNameB; @@ -192,20 +175,21 @@ public static class SCPRemotetoRemoteActionConfig extends PluginConfig { @Macro public String destPath; + @Nullable @Name("compressionFlag") @Description("Setting Compression Flag") public String compressFlag; + @Nullable @Name("verboseFlag") @Description("Setting Verbose Flag for more Log data") public String verboseFlag; + @Nullable @Name("directoryFlag") @Description("Setting Directory Flag") public String dirFlag; - - public String getHostBastion() { return hostBastion; } @@ -218,76 +202,32 @@ public String getUserNameBastion() { return userNameBastion; } - public byte[] getPrivateKey() { - assert privateKey != null; - return privateKey.getBytes(); - } + public byte[] getPrivateKey() { return privateKey.getBytes(StandardCharsets.UTF_8); } public byte[] getPassphrase(){ - if (passphrase == null){ - passphrase = ""; - } - return passphrase.getBytes(); - } + return passphrase == null ? new byte[0] : passphrase.getBytes(StandardCharsets.UTF_8); } + public String getUserNameA() { return userNameA; } - public String getUserNameA() { - return userNameA; - } - - public String getHostA() { - return hostA; - } - - public String getSource() { - return sourcePath; - } - - - public String getUserNameB() { - return userNameB; - } + public String getHostA() { return hostA; } - public String getHostB() { - return hostB; - } + public String getSource() { return sourcePath; } + public String getUserNameB() { return userNameB; } - public String getDest() { - return destPath; - } + public String getHostB() { return hostB; } - public String getCompressFlag() { - if (compressFlag.equals("compression-off")){ - return compressFlag = ""; - } - return compressFlag="-C"; - } + public String getDest() { return destPath; } - public String getVerboseFlag() { - if (verboseFlag.equals("verbose-off")){ - return verboseFlag = ""; - } - return verboseFlag="-v"; - } + public String getCompressFlag() { return compressFlag; } - public String getDirFlag() { - if (dirFlag.equals("directory-off")){ - return dirFlag = ""; - } - return dirFlag="-r"; - } + public String getVerboseFlag() { return verboseFlag; } - /** - * You can leverage this function to validate the configure options entered by the user. - */ + public String getDirFlag() { return dirFlag; } public void validate() throws IllegalArgumentException { // The containsMacro function can be used to check if there is a macro in the config option. // At runtime, the containsMacro function will always return false. - } } -} - - +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/SFTPCopyAction.java b/src/main/java/io/cdap/plugin/SFTPCopyAction.java index ffacd88..0b1f3aa 100644 --- a/src/main/java/io/cdap/plugin/SFTPCopyAction.java +++ b/src/main/java/io/cdap/plugin/SFTPCopyAction.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -59,7 +58,6 @@ public class SFTPCopyAction extends Action { private static final Logger LOG = LoggerFactory.getLogger(SFTPCopyAction.class); private SFTPCopyActionConfig config; - public SFTPCopyAction(SFTPCopyActionConfig config) { this.config = config; } @@ -69,7 +67,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); config.validate(); } - /** * Configurations for the FTP copy action plugin. */ @@ -122,7 +119,6 @@ public Map getFileSystemProperties(){ if (fileSystemProperties == null || fileSystemProperties.isEmpty()) { return properties; } - KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", "=>"); for (KeyValue keyVal : kvParser.parse(fileSystemProperties)) { String key = keyVal.getKey(); @@ -131,9 +127,9 @@ public Map getFileSystemProperties(){ } return properties; } + public SFTPCopyActionConfig(String host, int port, String userName, String password, String sshProperties, String srcPath, String destDirectory, String authType){ - this.host = host; this.port = port; this.userName = userName; @@ -153,30 +149,26 @@ public void validate() throws IllegalArgumentException { @Override public void run(ActionContext context) throws Exception { Path destination = new Path(config.getDestDirectory()); - Configuration conf = new Configuration(); Map properties = config.getFileSystemProperties(); for (Map.Entry entry : properties.entrySet()) { conf.set(entry.getKey(), entry.getValue()); } - FileSystem fileSystem = FileSystem.get(conf); - destination = fileSystem.makeQualified(destination); if (!fileSystem.exists(destination)) { fileSystem.mkdirs(destination); } - if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), + config.getUserName(), config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { sftpCopyLogic(fileSystem, destination, SFTPConnector, context); } catch (Exception e){ LOG.error(String.valueOf(e)); } } else { - try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), - config.getPassword(), config.getSSHProperties())) { + try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), + config.getUserName(), config.getPassword(), config.getSSHProperties())) { sftpCopyLogic(fileSystem, destination, SFTPConnector, context); } catch (Exception e) { LOG.error(String.valueOf(e)); @@ -184,14 +176,10 @@ public void run(ActionContext context) throws Exception { } } - public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector SFTPConnector, - ActionContext context) throws SftpException, IOException { - + ActionContext context) throws SftpException, IOException { ChannelSftp channelSftp = SFTPConnector.getSftpChannel(); - Vector files = channelSftp.ls(config.getSrcDirectory()); - List filesCopied = new ArrayList<>(); for (int index = 0; index < files.size(); index++) { Object obj = files.elementAt(index); @@ -203,7 +191,6 @@ public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector // ignore "." and ".." files continue; } - // Ignore files that don't match the given file regex if (!Strings.isNullOrEmpty(config.fileNameRegex)) { String fileName = entry.getFilename(); @@ -212,10 +199,8 @@ public void sftpCopyLogic(FileSystem fileSystem, Path destination, SFTPConnector continue; } } - LOG.info("Downloading file {}", entry.getFilename()); String completeFileName = config.getSrcDirectory() + "/" + entry.getFilename(); - if (config.getExtractZipFiles() && entry.getFilename().endsWith(".zip")) { copyJschZip(channelSftp.get(completeFileName), fileSystem, destination); } else { @@ -245,4 +230,4 @@ private void copyJschZip(InputStream is, FileSystem fs, Path destination) throws } } } -} +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java index 85e0ca8..4e17482 100644 --- a/src/main/java/io/cdap/plugin/SFTPDeleteAction.java +++ b/src/main/java/io/cdap/plugin/SFTPDeleteAction.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.SftpException; @@ -20,10 +36,8 @@ @Plugin(type = Action.PLUGIN_TYPE) @Name("SFTPDelete") public class SFTPDeleteAction extends Action { - private static final Logger LOG = LoggerFactory.getLogger(SFTPDeleteAction.class); private SFTPDeleteActionConfig config; - public SFTPDeleteAction(SFTPDeleteActionConfig config) { this.config = config; } @@ -73,7 +87,6 @@ public void run(ActionContext context) throws Exception { if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config.getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { - sftpDeleteLogic(filesToDelete, SFTPConnector); } catch (Exception e){ LOG.error(String.valueOf(e)); @@ -81,7 +94,6 @@ public void run(ActionContext context) throws Exception { } else { try (SFTPConnector SFTPConnector = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config.getPassword(), config.getSSHProperties())) { - sftpDeleteLogic(filesToDelete, SFTPConnector); } catch (Exception e){ LOG.error(String.valueOf(e)); diff --git a/src/main/java/io/cdap/plugin/SFTPPutAction.java b/src/main/java/io/cdap/plugin/SFTPPutAction.java index 5cd362c..36d6b73 100644 --- a/src/main/java/io/cdap/plugin/SFTPPutAction.java +++ b/src/main/java/io/cdap/plugin/SFTPPutAction.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.InputStream; import javax.annotation.Nullable; @@ -45,9 +44,7 @@ @Name("SFTPPut") public class SFTPPutAction extends Action { private static final Logger LOG = LoggerFactory.getLogger(SFTPPutAction.class); - private SFTPPutActionConfig config; - public SFTPPutAction(SFTPPutActionConfig config){ this.config = config; } @@ -57,7 +54,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); config.validate(); } - /** * Configurations for the SFTP put action plugin. */ @@ -88,7 +84,7 @@ public String getFileNameRegex() { } public SFTPPutActionConfig(String host, int port, String userName, String password, - String sshProperties, String srcPath, String destDirectory, String authType){ + String sshProperties, String srcPath, String destDirectory, String authType){ this.host = host; this.port = port; this.userName = userName; @@ -99,9 +95,6 @@ public SFTPPutActionConfig(String host, int port, String userName, String passwo this.authTypeBeingUsed = authType; } - /** - * Validates the config parameters required for unloading the data. - */ public void validate() throws IllegalArgumentException { // Check for required parameters // Check for required params for each action @@ -115,7 +108,6 @@ public void run(ActionContext context) throws Exception { if (!fileSystem.exists(source)) { throw new RuntimeException(String.format("Source Path doesn't exist at %s", source)); } - if (config.getAuthTypeBeingUsed().equals("privatekey-select")) { try (SFTPConnector sftp = new SFTPConnector(config.getHost(), config.getPort(), config.getUserName(), config .getPrivateKey(), config.getPassphrase(), config.getSSHProperties())) { @@ -133,23 +125,20 @@ public void run(ActionContext context) throws Exception { } } - private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp) throws SftpException, IOException { + private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp) + throws SftpException, IOException { ChannelSftp channel = sftp.getSftpChannel(); - try { channel.mkdir(config.getDestDirectory()); } catch (SftpException ex) { // Suppress since the directory might already exist. } - channel.cd(config.getDestDirectory()); - // Filter out only the files to copy FileStatus[] filesToCopy = fileSystem.listStatus(source, path -> { String fileName = path.getName(); return fileName.matches(config.getFileNameRegex()); }); - for (FileStatus file : filesToCopy) { Path filePath = file.getPath(); try (InputStream inputStream = fileSystem.open(filePath)) { @@ -157,4 +146,4 @@ private void sftpPutLogic(FileSystem fileSystem, Path source, SFTPConnector sftp } } } -} +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java index 2dfd24f..91dd1ef 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java +++ b/src/main/java/io/cdap/plugin/common/SFTPActionConfig.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -21,6 +21,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.api.plugin.PluginConfig; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -73,9 +74,7 @@ public String getHost() { return host; } - public int getPort() { - return (port != null) ? port : 22; - } + public int getPort() { return (port != null) ? port : 22; } public String getUserName() { return userName; @@ -85,19 +84,12 @@ public String getPassword() { return password; } - public byte[] getPrivateKey() { - assert privateKey != null; - return privateKey.getBytes(); - } + public byte[] getPrivateKey() { return privateKey.getBytes(StandardCharsets.UTF_8); } public String getAuthTypeBeingUsed() { return authTypeBeingUsed; } public byte[] getPassphrase(){ - if (passphrase == null){ - passphrase = ""; - } - return passphrase.getBytes(); - } + return passphrase == null ? new byte[0] : passphrase.getBytes(StandardCharsets.UTF_8); } public Map getSSHProperties(){ Map properties = new HashMap<>(); @@ -106,7 +98,6 @@ public Map getSSHProperties(){ if (sshProperties == null || sshProperties.isEmpty()) { return properties; } - KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); for (KeyValue keyVal : kvParser.parse(sshProperties)) { String key = keyVal.getKey(); diff --git a/src/main/java/io/cdap/plugin/common/SFTPConnector.java b/src/main/java/io/cdap/plugin/common/SFTPConnector.java index 5d256f5..981964a 100644 --- a/src/main/java/io/cdap/plugin/common/SFTPConnector.java +++ b/src/main/java/io/cdap/plugin/common/SFTPConnector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017 Cask Data, Inc. + * Copyright © 2019 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -34,11 +34,9 @@ public class SFTPConnector implements AutoCloseable { private static Channel channel; private final Session session; - //Connector Object to be used for Auth with Password - public SFTPConnector(String host, int port, String userName, String password, - Map sessionProperties) - throws Exception { + public SFTPConnector(String host, int port, String userName, String password, + Map sessionProperties) throws Exception { JSch jsch = new JSch(); this.session = jsch.getSession(userName, host, port); session.setPassword(password); @@ -51,12 +49,11 @@ public SFTPConnector(String host, int port, String userName, String password, channel = session.openChannel("sftp"); channel.connect(); } - // Connector Object to be used for Auth with SSH privatekey. + // Connector Object to be used for Auth with SSH PrivateKey. public SFTPConnector(String host, int port, String userName, byte[] privateKey, - byte[] passphrase, Map sessionProperties) - throws Exception { + byte[] passphrase, Map sessionProperties) throws Exception { JSch jsch = new JSch(); - jsch.addIdentity("key", privateKey,null,passphrase); + jsch.addIdentity("key", privateKey,null, passphrase); this.session = jsch.getSession(userName, host, port); LOG.info("Properties {}", sessionProperties); Properties properties = new Properties(); @@ -67,13 +64,10 @@ public SFTPConnector(String host, int port, String userName, byte[] privateKey, channel = session.openChannel("sftp"); channel.connect(); } - /** * Get the established sftp channel to perform operations. */ - public static ChannelSftp getSftpChannel() { - return (ChannelSftp) channel; - } + public static ChannelSftp getSftpChannel() { return (ChannelSftp) channel; } @Override public void close() { @@ -85,7 +79,6 @@ public void close() { LOG.warn("Error while disconnecting sftp channel.", t); } } - if (session != null) { try { session.disconnect(); diff --git a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java index 9679a1b..8350986 100644 --- a/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPCopyActionTest.java @@ -1,30 +1,39 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; - import io.cdap.cdap.etl.mock.action.MockActionContext; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import org.junit.After; import org.junit.Before; - - import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Properties; - - public class SFTPCopyActionTest { - MockSftpServer server; Session sshSession; @@ -55,23 +64,15 @@ public void stopSftp() throws IOException { public void testCopyFile() throws Exception { File tempFile = tempFolder.newFile(); Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); - String sourcePath = tempFile.getAbsoluteFile().toString(); String destPath = server.getBaseDirectory().toString(); SFTPCopyAction.SFTPCopyActionConfig config = new SFTPCopyAction.SFTPCopyActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - sourcePath, - destPath, - "password"); + "localhost", 9022, "tester", "testing", "", + sourcePath, destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); new SFTPCopyAction(config).configurePipeline(configurer); new SFTPCopyAction(config).run(new MockActionContext()); } - } diff --git a/src/main/test/io.cdap.plugin/SFTPDeleteTest.java b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java index b24d7c9..8258ebb 100644 --- a/src/main/test/io.cdap.plugin/SFTPDeleteTest.java +++ b/src/main/test/io.cdap.plugin/SFTPDeleteTest.java @@ -1,64 +1,67 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; - import io.cdap.cdap.etl.mock.action.MockActionContext; import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer; import org.junit.After; import org.junit.Before; - - import org.junit.Test; import software.sham.sftp.MockSftpServer; - import java.io.IOException; import java.util.Properties; - - public class SFTPDeleteTest { - MockSftpServer server; Session sshSession; @Before public void initSftp() throws IOException { - server = new MockSftpServer(9022); + server = new MockSftpServer(9022); } @Before public void initSshClient() throws JSchException { - JSch jsch = new JSch(); - sshSession = jsch.getSession("tester", "localhost", 9022); - Properties config = new Properties(); - config.setProperty("StrictHostKeyChecking", "no"); - sshSession.setConfig(config); - sshSession.setPassword("testing"); - sshSession.connect(); + JSch jsch = new JSch(); + sshSession = jsch.getSession("tester", "localhost", 9022); + Properties config = new Properties(); + config.setProperty("StrictHostKeyChecking", "no"); + sshSession.setConfig(config); + sshSession.setPassword("testing"); + sshSession.connect(); } @After public void stopSftp() throws IOException { - server.stop(); + server.stop(); } - @Test - public void testCopyFile() throws Exception { - SFTPDeleteAction.SFTPDeleteActionConfig config = new SFTPDeleteAction.SFTPDeleteActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - "", - "password"); - MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); - new SFTPDeleteAction(config).configurePipeline(configurer); - new SFTPDeleteAction(config).run(new MockActionContext()); + @Test + public void testCopyFile() throws Exception { + SFTPDeleteAction.SFTPDeleteActionConfig config = new SFTPDeleteAction.SFTPDeleteActionConfig( + "localhost", 9022, "tester", "testing", + "", "", "password"); + MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); + new SFTPDeleteAction(config).configurePipeline(configurer); + new SFTPDeleteAction(config).run(new MockActionContext()); } - } diff --git a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java index 0ed1211..1a17f7a 100644 --- a/src/main/test/io.cdap.plugin/SFTPPutActionTest.java +++ b/src/main/test/io.cdap.plugin/SFTPPutActionTest.java @@ -1,5 +1,20 @@ -package io.cdap.plugin; +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; @@ -9,7 +24,6 @@ import org.junit.*; import org.junit.rules.TemporaryFolder; import software.sham.sftp.MockSftpServer; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -18,10 +32,7 @@ import java.util.Map; import java.util.Properties; - - public class SFTPPutActionTest { - MockSftpServer server; Session sshSession; @@ -57,18 +68,11 @@ public void testPutFile() throws Exception { Files.write(tempFile.toPath(), "test".getBytes(StandardCharsets.UTF_8)); String destPath = server.getBaseDirectory().toString(); SFTPPutAction.SFTPPutActionConfig config = new SFTPPutAction.SFTPPutActionConfig( - "localhost", - 9022, - "tester", - "testing", - "", - tempFile.toString(), - destPath, - "password"); + "localhost", 9022, "tester", "testing", + "", tempFile.toString(), destPath, "password"); MockPipelineConfigurer configurer = new MockPipelineConfigurer(null); new SFTPPutAction(config).configurePipeline(configurer); new SFTPPutAction(config).run(new MockActionContext()); } - } diff --git a/widgets/SCPRemote-action.json b/widgets/SCPRemote-action.json index 452d07f..7a49e36 100644 --- a/widgets/SCPRemote-action.json +++ b/widgets/SCPRemote-action.json @@ -31,7 +31,6 @@ "label": "PrivateKey Passphrase", "name": "passphrase" }, - { "label": "Compression Flag", "name": "compressionFlag", @@ -96,13 +95,13 @@ "properties": [ { "widget-type": "textbox", - "label": "Host A User Name", - "name": "userNameA" + "label": "Host A Hostname", + "name": "hostA" }, { "widget-type": "textbox", - "label": "Host A Hostname", - "name": "hostA" + "label": "Host A User Name", + "name": "userNameA" }, { "widget-type": "textbox", @@ -111,19 +110,18 @@ } ] }, - { "label": "Remote Host B Properties", "properties": [ { "widget-type": "textbox", - "label": "Host B User Name", - "name": "userNameB" + "label": "Host B Hostname", + "name": "hostB" }, { "widget-type": "textbox", - "label": "Host B Hostname", - "name": "hostB" + "label": "Host B User Name", + "name": "userNameB" }, { "widget-type": "textbox", diff --git a/widgets/SFTPCopy-action.json b/widgets/SFTPCopy-action.json index e463d0c..1f1601b 100644 --- a/widgets/SFTPCopy-action.json +++ b/widgets/SFTPCopy-action.json @@ -77,7 +77,6 @@ "label": "Regex to match files that needs to be copied", "name" : "fileNameRegex" }, - { "widget-type": "keyvalue", "label": "Properties for SSH", @@ -135,4 +134,4 @@ ] } ] - } +}