Skip to content

Commit

Permalink
Acxiom#345 Created CommandSteps
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed Feb 23, 2023
1 parent 9aec4ea commit 1a09700
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 57 deletions.
41 changes: 21 additions & 20 deletions metalus-core/src/main/scala/com/acxiom/metalus/fs/FileManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ object FileManager {
val DEFAULT_BUFFER_SIZE: Int = 65536
val DEFAULT_COPY_BUFFER_SIZE: Int = 32768
def apply(): FileManager = new LocalFileManager

def deleteNio(file: File): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down Expand Up @@ -205,7 +225,7 @@ case class LocalFileResource(file: File) extends FileResource {
*/
override def delete: Boolean = {
if (file.isDirectory) {
deleteNio()
FileManager.deleteNio(file)
true
} else {
file.delete()
Expand Down Expand Up @@ -244,25 +264,6 @@ case class LocalFileResource(file: File) extends FileResource {
case _ => super.copy(destination)
}
}
private def deleteNio(): Unit = {
Files.walkFileTree(file.toPath,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.{PipelineContext, PipelineException, PipelineStepResponse}
import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters, StepResults}

import scala.io.Source
import scala.jdk.CollectionConverters._

@StepObject
object FunctionSteps {
@StepFunction("d252c782-afbd-4eef-9f59-c2e99677b8e6",
"Execute Local Command",
"Executes the provided command on the local machine",
"Pipeline", "Utilities", List[String]("batch"))
@StepParameters(Map("command" -> StepParameter(None, Some(true), None, None, None, None, Some("The Command object containing the execution information"))))
@StepResults(primaryType = "Int", secondaryTypes = Some(Map("stdOut" -> "String", "stdErr" -> "String")))
def executeCommand(command: Command, pipelineContext: PipelineContext): PipelineStepResponse = {
val parametersList = command.parameters.sortBy(_.position.getOrElse(0)).map(p => {
if (p.value.isDefined) {
s"${p.nameDash.getOrElse("")}${p.name} ${p.value.get}"
} else {
s"${p.nameDash.getOrElse("")}${p.name}"
}
})
val commandList = List(command.command) ::: parametersList
val processBuilder = new ProcessBuilder(commandList.asJava)
// Set the environment variables
command.environmentVariables.getOrElse(List())
.foreach(env => processBuilder.environment().put(env.name, env.value.getOrElse("").toString))
// Start the process
val process = processBuilder.start()
// Wait until the process completes
val exitCode = process.waitFor()
if (exitCode != 0) {
throw PipelineException(message =
Some(Source.fromInputStream(process.getErrorStream).mkString),
pipelineProgress = pipelineContext.currentStateInfo)
}
// Map the secondary returns
val m = Map[String, Any]()
val m1 = if (command.includeStdOut) {
m + ("stdOut" -> Source.fromInputStream(process.getInputStream).mkString)
} else {
m
}
val m2 = if (command.includeStdErr) {
m1 + ("stdErr" -> Source.fromInputStream(process.getErrorStream).mkString)
} else {
m1
}
val secondary = if (m2.nonEmpty) {
Some(m2)
} else {
None
}
PipelineStepResponse(Some(exitCode), secondary)
}
}

/**
* Represents a comman d to be executed.
*
* @param command The name of the function to execute.
* @param parameters A list of parameters to pass to the function.
* @param includeStdOut Boolean flag indicating whether std out should be returned.
* @param includeStdErr Boolean flag indicating whether std error should be returned.
* @param environmentVariables Optional list of environment variables to set. This may be ignored by some commands.
*/
case class Command(command: String,
parameters: List[CommandParameter],
includeStdOut: Boolean = false,
includeStdErr: Boolean = false,
environmentVariables: Option[List[CommandParameter]] = None)

/**
* Contains information related to a command being executed.
*
* @param name This contains the parameter information.
* @param value This contains the value that should be presented after the name. If a value isn't required, just use name.
* @param position This used to ensure the parameters are added in the correct order. Some command steps may ignore this.
* @param nameDash Used to put a dash or double dash in front of name if required.
*/
case class CommandParameter(name: String, value: Option[Any], position: Option[Int] = None, nameDash: Option[String] = None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.fs.FileManager
import com.acxiom.metalus.{Constants, PipelineException, TestHelper}
import com.acxiom.metalus.parser.JsonParser
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funspec.AnyFunSpec

import java.nio.file.Files

class FunctionStepsTests extends AnyFunSpec with BeforeAndAfterAll {
private val tempDir = Files.createTempDirectory("functiontest")
override def beforeAll(): Unit = {
tempDir.toFile.deleteOnExit()
Files.createTempFile(tempDir, "testFile1", ".txt")
Files.createTempFile(tempDir, "testFile2", ".txt")
}

override def afterAll(): Unit = {
FileManager.deleteNio(tempDir.toFile)
}

describe("FunctionSteps") {
describe("executeCommand") {
it("should fail to execute command and return output and error") {
val commandJson =
s"""
|{
| "command": "ls",
| "includeStdOut": true,
| "includeStdErr": true,
| "parameters": [
| {
| "name": "lh",
| "nameDash": "-",
| "position": 1
| },
| {
| "name": "/badDir",
| "position": 2
| }
| ]
|}""".stripMargin
val command = JsonParser.parseJson(commandJson, "com.acxiom.metalus.steps.Command").asInstanceOf[Command]

val thrown = intercept[PipelineException] {
FunctionSteps.executeCommand(command, TestHelper.generatePipelineContext())
}
assert(thrown.getMessage.startsWith("ls: /badDir: No such file or directory"))
}

it("should execute command and return output and error") {
val commandJson =
s"""
|{
| "command": "ls",
| "includeStdOut": true,
| "includeStdErr": true,
| "parameters": [
| {
| "name": "lh",
| "nameDash": "-",
| "position": 1
| },
| {
| "name": "$tempDir",
| "position": 2
| }
| ]
|}""".stripMargin
val command = JsonParser.parseJson(commandJson, "com.acxiom.metalus.steps.Command").asInstanceOf[Command]
val response = FunctionSteps.executeCommand(command, TestHelper.generatePipelineContext())
assert(response.primaryReturn.isDefined)
assert(response.primaryReturn.get.isInstanceOf[Int])
assert(response.primaryReturn.get.asInstanceOf[Int] == 0)
assert(response.namedReturns.isDefined)
assert(response.namedReturns.get.size == 2)
assert(response.namedReturns.get.contains("stdOut"))
assert(response.namedReturns.get.contains("stdErr"))
assert(response.namedReturns.get("stdErr").toString.isEmpty)
val stdOut = response.namedReturns.get("stdOut").toString
assert(stdOut.contains("testFile1"))
assert(stdOut.contains("testFile2"))
assert(stdOut.contains("total 0"))
assert(stdOut.split("\n").length == Constants.THREE)
}

it("should execute command and return error") {
val commandJson =
s"""
|{
| "command": "ls",
| "includeStdOut": false,
| "includeStdErr": true,
| "parameters": [
| {
| "name": "lh",
| "nameDash": "-",
| "position": 1
| },
| {
| "name": "$tempDir",
| "position": 2
| }
| ]
|}""".stripMargin
val command = JsonParser.parseJson(commandJson, "com.acxiom.metalus.steps.Command").asInstanceOf[Command]
val response = FunctionSteps.executeCommand(command, TestHelper.generatePipelineContext())
assert(response.primaryReturn.isDefined)
assert(response.primaryReturn.get.isInstanceOf[Int])
assert(response.primaryReturn.get.asInstanceOf[Int] == 0)
assert(response.namedReturns.isDefined)
assert(response.namedReturns.get.size == 1)
assert(response.namedReturns.get.contains("stdErr"))
assert(response.namedReturns.get("stdErr").toString.isEmpty)
}

it("should execute command and return output") {
val commandJson =
s"""
|{
| "command": "ls",
| "includeStdOut": true,
| "includeStdErr": false,
| "parameters": [
| {
| "name": "lh",
| "nameDash": "-",
| "position": 1
| },
| {
| "name": "$tempDir",
| "position": 2
| }
| ]
|}""".stripMargin
val command = JsonParser.parseJson(commandJson, "com.acxiom.metalus.steps.Command").asInstanceOf[Command]
val response = FunctionSteps.executeCommand(command, TestHelper.generatePipelineContext())
assert(response.primaryReturn.isDefined)
assert(response.primaryReturn.get.isInstanceOf[Int])
assert(response.primaryReturn.get.asInstanceOf[Int] == 0)
assert(response.namedReturns.isDefined)
assert(response.namedReturns.get.size == 1)
assert(response.namedReturns.get.contains("stdOut"))
val stdOut = response.namedReturns.get("stdOut").toString
assert(stdOut.contains("testFile1"))
assert(stdOut.contains("testFile2"))
assert(stdOut.contains("total 0"))
assert(stdOut.split("\n").length == Constants.THREE)
}

it("should execute command and return no secondary response") {
val commandJson =
s"""
|{
| "command": "ls",
| "includeStdOut": false,
| "includeStdErr": false,
| "parameters": [
| {
| "name": "lh",
| "nameDash": "-",
| "position": 1
| },
| {
| "name": "$tempDir",
| "position": 2
| }
| ]
|}""".stripMargin
val command = JsonParser.parseJson(commandJson, "com.acxiom.metalus.steps.Command").asInstanceOf[Command]
val response = FunctionSteps.executeCommand(command, TestHelper.generatePipelineContext())
assert(response.primaryReturn.isDefined)
assert(response.primaryReturn.get.isInstanceOf[Int])
assert(response.primaryReturn.get.asInstanceOf[Int] == 0)
assert(response.namedReturns.isEmpty)
}
}
}
}
37 changes: 0 additions & 37 deletions metalus-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@
<properties>
<spark.compat.version>3.1</spark.compat.version>
<spark.version>3.1.3</spark.version>
<json4s.version>3.6.6</json4s.version>
<mongo.version>3.0.1</mongo.version>
<scala.compat.version>2.12</scala.compat.version>
<scala.version>2.12.13</scala.version>
Expand Down Expand Up @@ -240,7 +239,6 @@
<properties>
<spark.compat.version>3.2</spark.compat.version>
<spark.version>3.2.1</spark.version>
<json4s.version>3.7.0-M11</json4s.version>
<mongo.version>3.0.2</mongo.version>
<scala.compat.version>2.12</scala.compat.version>
<scala.version>2.12.13</scala.version>
Expand All @@ -252,11 +250,6 @@
<hadoop.minicluster.version>3.3.1</hadoop.minicluster.version>
</properties>
<dependencies>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_${scala.compat.version}</artifactId>
<version>${json4s.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
Expand Down Expand Up @@ -305,7 +298,6 @@
<properties>
<spark.compat.version>3.3</spark.compat.version>
<spark.version>3.3.0</spark.version>
<json4s.version>3.7.0-M11</json4s.version>
<mongo.version>3.0.2</mongo.version>
<scala.compat.version>2.12</scala.compat.version>
<scala.version>2.12.13</scala.version>
Expand All @@ -317,35 +309,6 @@
<hadoop.minicluster.version>3.3.3</hadoop.minicluster.version>
</properties>
<dependencies>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_${scala.compat.version}</artifactId>
<version>${json4s.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
Expand Down

0 comments on commit 1a09700

Please sign in to comment.