Skip to content

Commit

Permalink
Merge pull request #357 from dafreels/metalus_2_0_0
Browse files Browse the repository at this point in the history
#345 Created CommandSteps
  • Loading branch information
dafreels committed Mar 3, 2023
2 parents 9aec4ea + aa7e10a commit 3150e1d
Show file tree
Hide file tree
Showing 31 changed files with 1,278 additions and 698 deletions.
6 changes: 5 additions & 1 deletion metalus-core/pom.xml
Expand Up @@ -205,7 +205,7 @@
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>com.thoughtworks.paranamer:*:jar:</exclude>
<!-- <exclude>com.thoughtworks.paranamer:*:jar:</exclude>-->
<exclude>com.fasterxml:*:jar:</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>org.apache.kafka:*:jar:</exclude>
Expand Down Expand Up @@ -237,6 +237,10 @@
<pattern>org.json4s</pattern>
<shadedPattern>metalus.org.json4s</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>metalus.com.thoughtworks.paranamer</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
@@ -1,7 +1,7 @@
package com.acxiom.metalus.api

import com.acxiom.metalus.Constants
import org.json4s.{DefaultFormats, Formats}
import com.acxiom.metalus.fs.FileResource
import org.slf4j.LoggerFactory

import java.io.{BufferedInputStream, BufferedOutputStream, InputStream, OutputStream}
Expand Down Expand Up @@ -81,8 +81,6 @@ class HttpRestClient(hostUrl: String, authorization: Option[Authorization], allo
HttpsURLConnection.setDefaultHostnameVerifier(HttpRestClient.SELF_SIGNED_HOST_VERIFIER)
}

private implicit val formats: Formats = DefaultFormats

private val baseUrl = new URL(hostUrl)

baseUrl.getProtocol.toLowerCase match {
Expand Down Expand Up @@ -389,6 +387,14 @@ class HttpRestClient(hostUrl: String, authorization: Option[Authorization], allo
responseCode < 300
}

/**
* Provides a FileResource for the provided path.
*
* @param path The path to represent.
* @return A FileResource representation.
*/
def toFileResource(path: String): FileResource = HttpFileResource(path, this)

private def openUrlConnection(path: String, headers: Option[Map[String, String]] = None): HttpURLConnection = {
val connection = new URL(baseUrl, path).openConnection().asInstanceOf[HttpURLConnection]
if (authorization.isDefined) {
Expand Down Expand Up @@ -454,3 +460,58 @@ case class HttpOutputStream(connection: HttpURLConnection) extends OutputStream
connection.disconnect()
}
}

case class HttpFileResource(path: String, httpRestClient: HttpRestClient) extends FileResource {
/**
* The simple name of this file. Does not include the full path.
*
* @return The file name.
*/
override def fileName: String = path
/**
* Returns the full name of this file including path.
*
* @return The full name.
*/
override def fullName: String = path
/**
* True if this represents a directory.
*
* @return True if this represents a directory.
*/
override def directory: Boolean = false
/**
* Will attempt to rename this file to the destination path.
*
* @param destPath The destination path.
* @return True if the path could be renamed.
*/
override def rename(destPath: String): Boolean = false
/**
* Attempts to delete this file.
*
* @return True if the path could be deleted.
*/
override def delete: Boolean = httpRestClient.delete(path)
/**
* Get the size of this file.
*
* @return size of this file.
*/
override def size: Long = httpRestClient.getContentLength(path)
/**
* Creates a buffered input stream for this file.
*
* @param bufferSize The buffer size to apply to the stream
* @return A buffered input stream
*/
override def getInputStream(bufferSize: Int): InputStream = httpRestClient.getInputStream(path, bufferSize)
/**
* Creates a buffered output stream for this file.
*
* @param append Boolean flag indicating whether data should be appended. Default is true
* @param bufferSize The buffer size to apply to the stream
* @return
*/
override def getOutputStream(append: Boolean, bufferSize: Int): OutputStream = httpRestClient.getOutputStream(path, bufferSize)
}
41 changes: 21 additions & 20 deletions metalus-core/src/main/scala/com/acxiom/metalus/fs/FileManager.scala
Expand Up @@ -10,6 +10,26 @@ object FileManager {
val DEFAULT_BUFFER_SIZE: Int = 65536
val DEFAULT_COPY_BUFFER_SIZE: Int = 32768
def apply(): FileManager = new LocalFileManager

def deleteNio(file: File): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down Expand Up @@ -205,7 +225,7 @@ case class LocalFileResource(file: File) extends FileResource {
*/
override def delete: Boolean = {
if (file.isDirectory) {
deleteNio()
FileManager.deleteNio(file)
true
} else {
file.delete()
Expand Down Expand Up @@ -244,25 +264,6 @@ case class LocalFileResource(file: File) extends FileResource {
case _ => super.copy(destination)
}
}
private def deleteNio(): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down
@@ -0,0 +1,83 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.{PipelineContext, PipelineException, PipelineStepResponse}
import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters, StepResults}

import scala.io.Source
import scala.jdk.CollectionConverters._

@StepObject
object FunctionSteps {
@StepFunction("d252c782-afbd-4eef-9f59-c2e99677b8e6",
"Execute Local Command",
"Executes the provided command on the local machine",
"Pipeline", "Utilities", List[String]("batch"))
@StepParameters(Map("command" -> StepParameter(None, Some(true), None, None, None, None, Some("The Command object containing the execution information"))))
@StepResults(primaryType = "Int", secondaryTypes = Some(Map("stdOut" -> "String", "stdErr" -> "String")))
def executeCommand(command: Command, pipelineContext: PipelineContext): PipelineStepResponse = {
val parametersList = command.parameters.sortBy(_.position.getOrElse(0)).map(p => {
if (p.value.isDefined) {
s"${p.nameDash.getOrElse("")}${p.name} ${p.value.get}"
} else {
s"${p.nameDash.getOrElse("")}${p.name}"
}
})
val commandList = List(command.command) ::: parametersList
val processBuilder = new ProcessBuilder(commandList.asJava)
// Set the environment variables
command.environmentVariables.getOrElse(List())
.foreach(env => processBuilder.environment().put(env.name, env.value.getOrElse("").toString))
// Start the process
val process = processBuilder.start()
// Wait until the process completes
val exitCode = process.waitFor()
if (exitCode != 0) {
throw PipelineException(message =
Some(Source.fromInputStream(process.getErrorStream).mkString),
pipelineProgress = pipelineContext.currentStateInfo)
}
// Map the secondary returns
val m = Map[String, Any]()
val m1 = if (command.includeStdOut) {
m + ("stdOut" -> Source.fromInputStream(process.getInputStream).mkString)
} else {
m
}
val m2 = if (command.includeStdErr) {
m1 + ("stdErr" -> Source.fromInputStream(process.getErrorStream).mkString)
} else {
m1
}
val secondary = if (m2.nonEmpty) {
Some(m2)
} else {
None
}
PipelineStepResponse(Some(exitCode), secondary)
}
}

/**
* Represents a comman d to be executed.
*
* @param command The name of the function to execute.
* @param parameters A list of parameters to pass to the function.
* @param includeStdOut Boolean flag indicating whether std out should be returned.
* @param includeStdErr Boolean flag indicating whether std error should be returned.
* @param environmentVariables Optional list of environment variables to set. This may be ignored by some commands.
*/
case class Command(command: String,
parameters: List[CommandParameter],
includeStdOut: Boolean = false,
includeStdErr: Boolean = false,
environmentVariables: Option[List[CommandParameter]] = None)

/**
* Contains information related to a command being executed.
*
* @param name This contains the parameter information.
* @param value This contains the value that should be presented after the name. If a value isn't required, just use name.
* @param position This used to ensure the parameters are added in the correct order. Some command steps may ignore this.
* @param nameDash Used to put a dash or double dash in front of name if required.
*/
case class CommandParameter(name: String, value: Option[Any], position: Option[Int] = None, nameDash: Option[String] = None)

0 comments on commit 3150e1d

Please sign in to comment.