Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#345 Created CommandSteps #357

Merged
merged 3 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metalus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>com.thoughtworks.paranamer:*:jar:</exclude>
<!-- <exclude>com.thoughtworks.paranamer:*:jar:</exclude>-->
<exclude>com.fasterxml:*:jar:</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>org.apache.kafka:*:jar:</exclude>
Expand Down Expand Up @@ -237,6 +237,10 @@
<pattern>org.json4s</pattern>
<shadedPattern>metalus.org.json4s</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>metalus.com.thoughtworks.paranamer</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.acxiom.metalus.api

import com.acxiom.metalus.Constants
import org.json4s.{DefaultFormats, Formats}
import com.acxiom.metalus.fs.FileResource
import org.slf4j.LoggerFactory

import java.io.{BufferedInputStream, BufferedOutputStream, InputStream, OutputStream}
Expand Down Expand Up @@ -81,8 +81,6 @@ class HttpRestClient(hostUrl: String, authorization: Option[Authorization], allo
HttpsURLConnection.setDefaultHostnameVerifier(HttpRestClient.SELF_SIGNED_HOST_VERIFIER)
}

private implicit val formats: Formats = DefaultFormats

private val baseUrl = new URL(hostUrl)

baseUrl.getProtocol.toLowerCase match {
Expand Down Expand Up @@ -389,6 +387,14 @@ class HttpRestClient(hostUrl: String, authorization: Option[Authorization], allo
responseCode < 300
}

/**
* Provides a FileResource for the provided path.
*
* @param path The path to represent.
* @return A FileResource representation.
*/
def toFileResource(path: String): FileResource = HttpFileResource(path, this)

private def openUrlConnection(path: String, headers: Option[Map[String, String]] = None): HttpURLConnection = {
val connection = new URL(baseUrl, path).openConnection().asInstanceOf[HttpURLConnection]
if (authorization.isDefined) {
Expand Down Expand Up @@ -454,3 +460,58 @@ case class HttpOutputStream(connection: HttpURLConnection) extends OutputStream
connection.disconnect()
}
}

case class HttpFileResource(path: String, httpRestClient: HttpRestClient) extends FileResource {
/**
* The simple name of this file. Does not include the full path.
*
* @return The file name.
*/
override def fileName: String = path
/**
* Returns the full name of this file including path.
*
* @return The full name.
*/
override def fullName: String = path
/**
* True if this represents a directory.
*
* @return True if this represents a directory.
*/
override def directory: Boolean = false
/**
* Will attempt to rename this file to the destination path.
*
* @param destPath The destination path.
* @return True if the path could be renamed.
*/
override def rename(destPath: String): Boolean = false
/**
* Attempts to delete this file.
*
* @return True if the path could be deleted.
*/
override def delete: Boolean = httpRestClient.delete(path)
/**
* Get the size of this file.
*
* @return size of this file.
*/
override def size: Long = httpRestClient.getContentLength(path)
/**
* Creates a buffered input stream for this file.
*
* @param bufferSize The buffer size to apply to the stream
* @return A buffered input stream
*/
override def getInputStream(bufferSize: Int): InputStream = httpRestClient.getInputStream(path, bufferSize)
/**
* Creates a buffered output stream for this file.
*
* @param append Boolean flag indicating whether data should be appended. Default is true
* @param bufferSize The buffer size to apply to the stream
* @return
*/
override def getOutputStream(append: Boolean, bufferSize: Int): OutputStream = httpRestClient.getOutputStream(path, bufferSize)
}
41 changes: 21 additions & 20 deletions metalus-core/src/main/scala/com/acxiom/metalus/fs/FileManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ object FileManager {
val DEFAULT_BUFFER_SIZE: Int = 65536
val DEFAULT_COPY_BUFFER_SIZE: Int = 32768
def apply(): FileManager = new LocalFileManager

def deleteNio(file: File): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down Expand Up @@ -205,7 +225,7 @@ case class LocalFileResource(file: File) extends FileResource {
*/
override def delete: Boolean = {
if (file.isDirectory) {
deleteNio()
FileManager.deleteNio(file)
true
} else {
file.delete()
Expand Down Expand Up @@ -244,25 +264,6 @@ case class LocalFileResource(file: File) extends FileResource {
case _ => super.copy(destination)
}
}
private def deleteNio(): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.{PipelineContext, PipelineException, PipelineStepResponse}
import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters, StepResults}

import scala.io.Source
import scala.jdk.CollectionConverters._

@StepObject
object FunctionSteps {
@StepFunction("d252c782-afbd-4eef-9f59-c2e99677b8e6",
"Execute Local Command",
"Executes the provided command on the local machine",
"Pipeline", "Utilities", List[String]("batch"))
@StepParameters(Map("command" -> StepParameter(None, Some(true), None, None, None, None, Some("The Command object containing the execution information"))))
@StepResults(primaryType = "Int", secondaryTypes = Some(Map("stdOut" -> "String", "stdErr" -> "String")))
def executeCommand(command: Command, pipelineContext: PipelineContext): PipelineStepResponse = {
val parametersList = command.parameters.sortBy(_.position.getOrElse(0)).map(p => {
if (p.value.isDefined) {
s"${p.nameDash.getOrElse("")}${p.name} ${p.value.get}"
} else {
s"${p.nameDash.getOrElse("")}${p.name}"
}
})
val commandList = List(command.command) ::: parametersList
val processBuilder = new ProcessBuilder(commandList.asJava)
// Set the environment variables
command.environmentVariables.getOrElse(List())
.foreach(env => processBuilder.environment().put(env.name, env.value.getOrElse("").toString))
// Start the process
val process = processBuilder.start()
// Wait until the process completes
val exitCode = process.waitFor()
if (exitCode != 0) {
throw PipelineException(message =
Some(Source.fromInputStream(process.getErrorStream).mkString),
pipelineProgress = pipelineContext.currentStateInfo)
}
// Map the secondary returns
val m = Map[String, Any]()
val m1 = if (command.includeStdOut) {
m + ("stdOut" -> Source.fromInputStream(process.getInputStream).mkString)
} else {
m
}
val m2 = if (command.includeStdErr) {
m1 + ("stdErr" -> Source.fromInputStream(process.getErrorStream).mkString)
} else {
m1
}
val secondary = if (m2.nonEmpty) {
Some(m2)
} else {
None
}
PipelineStepResponse(Some(exitCode), secondary)
}
}

/**
* Represents a comman d to be executed.
*
* @param command The name of the function to execute.
* @param parameters A list of parameters to pass to the function.
* @param includeStdOut Boolean flag indicating whether std out should be returned.
* @param includeStdErr Boolean flag indicating whether std error should be returned.
* @param environmentVariables Optional list of environment variables to set. This may be ignored by some commands.
*/
case class Command(command: String,
parameters: List[CommandParameter],
includeStdOut: Boolean = false,
includeStdErr: Boolean = false,
environmentVariables: Option[List[CommandParameter]] = None)

/**
* Contains information related to a command being executed.
*
* @param name This contains the parameter information.
* @param value This contains the value that should be presented after the name. If a value isn't required, just use name.
* @param position This used to ensure the parameters are added in the correct order. Some command steps may ignore this.
* @param nameDash Used to put a dash or double dash in front of name if required.
*/
case class CommandParameter(name: String, value: Option[Any], position: Option[Int] = None, nameDash: Option[String] = None)
Loading