Skip to content

Commit

Permalink
Merge pull request #390 from dafreels/metalus_2_0_0
Browse files Browse the repository at this point in the history
#345 Added ProcessUtils for starting and registering processes.
  • Loading branch information
dafreels committed Apr 18, 2023
2 parents b9358db + 128f062 commit ac3f787
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 79 deletions.
66 changes: 36 additions & 30 deletions metalus-agent/app/com/acxiom/utils/AgentUtils.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.acxiom.utils

import com.acxiom.metalus.PipelineException
import com.acxiom.metalus.applications.Application
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.utils.DriverUtils
import com.acxiom.metalus.{DependencyManager, PipelineException, RetryPolicy}
import play.api.Configuration

import java.io.{File, FileOutputStream, FilenameFilter}
import java.net.InetAddress
import java.nio.file.Files
import java.security.MessageDigest
import java.util.UUID
import java.util.jar.{Attributes, JarEntry, JarOutputStream}
import javax.inject.{Inject, Singleton}
import scala.io.Source
import scala.jdk.CollectionConverters._

object AgentUtils {
@Singleton
class AgentUtils@Inject()(configuration: Configuration, processUtils: ProcessUtils) {
lazy val AGENT_ID: String = {
val existingId = System.getenv("AGENT_ID")
if (Option(existingId).isDefined) {
Expand Down Expand Up @@ -55,7 +56,7 @@ object AgentUtils {
* @param config The system config used for accessing properties.
* @return A classpath for this request.
*/
def generateClassPath(request: ApplicationRequest, config: Configuration): String = {
def generateClassPath(request: ApplicationRequest): String = {
if (!request.resolveClasspath || request.stepLibraries.getOrElse(List()).isEmpty) {
// In this case, add metalus-core which should be local
val jars = new File("/opt/docker/lib/").listFiles(new FilenameFilter() {
Expand All @@ -75,9 +76,24 @@ object AgentUtils {
}.foldLeft("") {
_ + _
}
val cacheDir = config.get[String]("api.context.cache.dir")
val cacheDir = configuration.get[String]("api.context.cache.dir")
val cacheFile = new File(cacheDir, s"$cacheName.json")
// TODO Create a lock for this cache file and then release after the classpath is ready
val lockFile = new File(cacheDir, s"$cacheName.lck")
if (lockFile.exists() && !cacheFile.exists()) {
// Wait until lock is removed
val retry = RetryPolicy(None, Some(5), Some(false))
// Wait up to 5 minutes for the lock to be released.
val timeout = configuration.get[Int]("api.context.lock.timeout")
(1 to timeout).takeWhile(r => {
DriverUtils.invokeWaitPeriod(retry, 0)
lockFile.exists()
})
if (lockFile.exists()) {
lockFile.delete()
}
// Create the lock file
lockFile.createNewFile()
}
if (cacheFile.exists()) {
// Load the classpath from the cache
val source = Source.fromFile(cacheFile)
Expand All @@ -86,23 +102,17 @@ object AgentUtils {
json("classPath").toString
} else {
// Generate the classpath and create the cache file
val jarDir = configuration.get[String]("api.context.jars.dir")
// Make sure the local staging dir is part of the repos
val jarDir = config.get[String]("api.context.jars.dir")
val repos = (request.extraJarRepos.getOrElse(List()) +: jarDir).mkString(",")
val metalusUtils = config.get[String]("api.context.utils.dir")
val command = s"$metalusUtils/bin/dependency-resolver.sh --output-path $jarDir --jar-files $jars --repo $repos --jar-separator :"
val processBuilder = new ProcessBuilder(command)
val process = processBuilder.start()
val exitCode = process.waitFor()
// TODO Need to ensure multiple processes aren't writing to the jarDir at the same time or implement locking in metalus-utils
if (exitCode != 0) {
throw PipelineException(message = Some(s"Failed to build classpath: ${Source.fromInputStream(process.getErrorStream).mkString}"),
pipelineProgress = None)
}
val classPath = Source.fromInputStream(process.getInputStream).getLines().toList.last
val cache = Map[String, String]("classPath" -> classPath, "jars" -> jars, "repos" -> repos)
val parameters = Map[String, Any]("output-path" -> jarDir,
"jar-files" -> jars, "repo" -> repos)
val classpath = DependencyManager.resolveClasspath(parameters)
val cp = classpath.generateClassPath("", parameters.getOrElse("jar-separator", ",").asInstanceOf[String])
val cache = Map[String, String]("classPath" -> cp, "jars" -> jars, "repos" -> repos)
Files.write(cacheFile.toPath, JsonParser.serialize(cache, None).getBytes)
classPath
lockFile.delete()
cp
}
}
}
Expand All @@ -111,18 +121,17 @@ object AgentUtils {
* Builds an execution command based on the provided ApplicationRequest.
*
* @param request The request.
* @param config The system configuration.
* @return The process information needed to track the execution.
*/
def executeRequest(request: ApplicationRequest, config: Configuration): ProcessInfo = {
def executeRequest(request: ApplicationRequest): ProcessInfo = {
// Add the sessionId
val sessionId = request.existingSessionId.getOrElse(UUID.randomUUID().toString)
// Store the Application JSON in a jar file and add to the classpath
val jarDir = config.get[String]("api.context.jars.dir")
val applicationJar = AgentUtils.createApplicationJar(request.application, sessionId, jarDir)
val jarDir = configuration.get[String]("api.context.jars.dir")
val applicationJar = createApplicationJar(request.application, sessionId, jarDir)
val command = List[String]("scala",
"-cp",
s"$applicationJar:${AgentUtils.generateClassPath(request, config)}",
s"$applicationJar:${generateClassPath(request)}",
"com.acxiom.metalus.drivers.DefaultPipelineDriver",
"--executionEngines", request.executions.getOrElse(List[String]("batch")).mkString(","))
// Add parameters
Expand All @@ -132,9 +141,6 @@ object AgentUtils {
}
// Start command and track processId
val commandList = (parameterCommand ::: List("--existingSessionId", sessionId, "--applicationId", sessionId))
val processBuilder = new ProcessBuilder(commandList.asJava)
val process = processBuilder.start()
val pid = process.pid()
ProcessInfo(AgentUtils.AGENT_ID, sessionId, pid, InetAddress.getLocalHost.getHostName, commandList)
processUtils.executeCommand(commandList, sessionId, AGENT_ID)
}
}
10 changes: 10 additions & 0 deletions metalus-agent/app/com/acxiom/utils/ProcessInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
package com.acxiom.utils

import java.util.Date

case class ProcessInfo(agentId: String, sessionId: String, processId: Long, hostName: String, command: List[String])

case class SessionProcess(sessionId: String,
processId: Long,
agentId: String,
hostName: String,
exitCode: Int,
startTime: Date,
endTime: Date)
103 changes: 102 additions & 1 deletion metalus-agent/app/com/acxiom/utils/ProcessUtils.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
package com.acxiom.utils

import play.api.Configuration

import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.sql.{DriverManager, ResultSet}
import java.util.{Date, Properties}
import javax.inject.{Inject, Singleton}
import scala.jdk.CollectionConverters._

@Singleton
class ProcessUtils @Inject()(configuration: Configuration) {
private lazy val connection = {
val properties = new Properties()
if (configuration.get[String]("api.context.db.user").nonEmpty) {
properties.setProperty("user", configuration.get[String]("api.context.db.user"))
properties.setProperty("password", configuration.get[String]("api.context.db.password"))
}
properties.setProperty("driver", configuration.get[String]("api.context.db.driver"))
DriverManager.getConnection(configuration.get[String]("api.context.db.url"), properties)
}

object ProcessUtils {
/**
* Returns limited information about this host.
*
Expand All @@ -12,6 +30,89 @@ object ProcessUtils {
val osBean = ManagementFactory.getOperatingSystemMXBean
HostInfo(osBean.getAvailableProcessors, osBean.getSystemLoadAverage, osBean.getArch)
}

/**
* Called to execute a command and return the process information.
*
* @param commandList The command to execute.
* @param sessionId The current sessionId.
* @param agentId The current agentId.
* @return True if the process data was stored.
*/
def executeCommand(commandList: List[String], sessionId: String, agentId: String): ProcessInfo = {
val processBuilder = new ProcessBuilder(commandList.asJava)
val process = processBuilder.start()
val pid = process.pid()
val processInfo = ProcessInfo(agentId, sessionId, pid, InetAddress.getLocalHost.getHostName, commandList)
// TODO Register the process with the monitoring task
val stmt = connection.prepareStatement(
s"""INSERT INTO SESSION_PROCESS
|VALUES('${processInfo.sessionId}', ${processInfo.processId}, '${processInfo.agentId}',
|'${processInfo.hostName}', NULL, ${System.currentTimeMillis()}, NULL, NULL)""".stripMargin)
val response = stmt.executeUpdate() == 1
// TODO Handle the response
stmt.close()
processInfo
}

/**
* Called when a process has completed.
*
* @param processInfo The process information.
* @param exitCode The exit code for the process.
* @return true if the information was updated.
*/
def completeProcess(processInfo: ProcessInfo, exitCode: Int): Boolean = {
val stmt = connection.prepareStatement(
s"""UPDATE SESSION_PROCESS
|SET END_TIME = ${System.currentTimeMillis()},
|EXIT_CODE = $exitCode
|WHERE SESSION_ID = '${processInfo.sessionId}'
|AND PROCESS_ID = ${processInfo.processId}""".stripMargin)
val response = stmt.executeUpdate() == 1
stmt.close()
response
}

def getCurrentProcessInformation(agentId: String): List[SessionProcess] = {
val stmt = connection.prepareStatement(
s"""SELECT * FROM SESSION_PROCESS
|WHERE AGENT_ID = '$agentId' AND EXIT_CODE IS NULL""".stripMargin)
val rs = stmt.executeQuery()
Iterator.from(0).takeWhile(_ => rs.next()).map(_ => createSessionProcessRecord(rs)).toList
}

def getProcessInfo(processInfo: ProcessInfo, agentId: String): Option[SessionProcess] = {
val stmt = connection.prepareStatement(
s"""SELECT * FROM SESSION_PROCESS
|WHERE AGENT_ID = '$agentId' AND PROCESS_ID = ${processInfo.processId}""".stripMargin)
val rs = stmt.executeQuery()
if (rs.next()) {
Some(createSessionProcessRecord(rs))
} else {
None
}
}

def checkProcessStatus(processInfo: ProcessInfo): Unit = {
// TODO The polling thread will need the Process objects so it can get the exitCode. ProcessHandle cannot do this.
val handle = ProcessHandle.of(processInfo.processId)
if (handle.isPresent) {
handle.get().isAlive
} else {
// TODO I assume this is a recovery scenario
}
}

private def createSessionProcessRecord(rs: ResultSet) = {
SessionProcess(rs.getString("SESSION_ID"),
rs.getLong("PROCESS_ID"),
rs.getString("AGENT_ID"),
rs.getString("HOSTNAME"),
rs.getInt("EXIT_CODE"),
new Date(rs.getLong("START_TIME")),
new Date(rs.getLong("START_TIME")))
}
}

case class HostInfo(cpus: Int, avgLoad: Double, arch: String)
33 changes: 1 addition & 32 deletions metalus-agent/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import com.typesafe.sbt.packager.docker.DockerChmodType
import com.typesafe.sbt.packager.docker.DockerPermissionStrategy
import com.typesafe.sbt.packager.docker.ExecCmd

name := """metalus-agent"""
organization := "com.acxiom"
Expand All @@ -10,40 +9,12 @@ scalaVersion := "2.12.17"

val metalusVersion = "2.0.0"
val scalaCompat = "2.12"
val metalusUtilsArtifact = s"metalus-utils_$scalaCompat-$metalusVersion.tar.gz"

lazy val metalusUtilsTask = taskKey[Unit]("Download Metalus Utils")
metalusUtilsTask := {
println("Check for existing file")
val existingFile = new File(metalusUtilsArtifact)
if (!existingFile.exists()) {
val localFile = new File(s"../metalus-utils/target/$metalusUtilsArtifact")
println(s"File missing, preparing to copy from ${localFile.getAbsolutePath}")
if (localFile.exists()) {
println(s"copying from ${localFile.absolutePath}")
// Copy the file here
java.nio.file.Files.copy(localFile.toPath, existingFile.toPath)
} else {
// TODO Try to download
// s"https://github.com/Acxiom/metalus/releases/download/$metalusVersion/metalus-utils_$scalaCompat-$metalusVersion.tar.gz"
}
}
}

(Docker / stage) := ((Docker / stage) dependsOn metalusUtilsTask).value
dockerChmodType := DockerChmodType.UserGroupWriteExecute
dockerPermissionStrategy := DockerPermissionStrategy.CopyChown
dockerExposedPorts := Seq(9000)
dockerUpdateLatest := true
dockerBaseImage := "adoptopenjdk:11-jre-hotspot"
Universal / mappings := {
(Universal / mappings).value :+
(file(new File(metalusUtilsArtifact).getAbsolutePath) -> metalusUtilsArtifact)
}
dockerCommands ++= Seq(
ExecCmd("RUN", "tar", "-xf", s"/opt/docker/$metalusUtilsArtifact"),
ExecCmd("RUN", "rm", "-f", s"/opt/docker/$metalusUtilsArtifact")
)

enablePlugins(PlayScala, DockerPlugin, BuildInfoPlugin, ScoverageSbtPlugin)

Expand All @@ -57,14 +28,12 @@ coverageExcludedPackages := "<empty>;.*Reverse.*;.*Routes.*;"
libraryDependencies ++= Seq(
guice,
"com.acxiom" %% "metalus-core" % metalusVersion,
"com.acxiom" %% "metalus-utils" % metalusVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6",
"org.slf4j" % "log4j-over-slf4j" % "1.7.32",
// "org.slf4j" % "slf4j-api" % "2.0.6",
// "ch.qos.logback" % "logback-core" % "1.3.4",
"com.github.tototoshi" %% "play-json4s-native" % "0.10.0",
"com.github.tototoshi" %% "play-json4s-test-native" % "0.10.0" % Test,
"org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test,
)

resolvers += Resolver.mavenLocal

11 changes: 11 additions & 0 deletions metalus-agent/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,14 @@ api.context.cache.dir=${?CACHE_DIR}
api.context.jars.dir="/opt/jarDir"
api.context.jars.dir=${?JAR_DIR}
api.context.utils.dir="/opt/docker/metalus-utils"
api.context.lock.timeout=300000
api.context.lock.timeout=${?CLASSPATH_LOCK_TIMEOUT}

api.context.db.url="jdbc:derby:memory:metalus;create=true"
api.context.db.url=${?DB_URL}
api.context.db.driver="org.apache.derby.jdbc.EmbeddedDriver"
api.context.db.driver=${?DB_DRIVER}
api.context.db.user=""
api.context.db.user=${?DB_USER}
api.context.db.password=""
api.context.db.password=${?DB_PASSWORD}
3 changes: 0 additions & 3 deletions metalus-agent/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ Local builds and testing uses docker to generate the container. It is recommende
Minikube is used for deploying the service to kubernetes locally.
* Install [Minikube](https://minikube.sigs.k8s.io/docs/start/)
* Start Minikube: `minikube -p minikube start --driver=docker`
* Setup the Namespace for the service TODO: Where is the common yaml stuff going to be stored?
* Create the _metalus_ namespace once: `kubectl create namespace metalus`
* Set the default Namespace: `kubectl config set-context --current --namespace=metalus`

## Build and Publish locally
* Make sure that containers are published to docker: `eval $(minikube -p minikube docker-env)`
* The build will attempt to pull _metalus-utils_ from the local directory. Next it will attempt to
pull from the _metalus-utils/target_ directory. It is recommended that it be built locally.
* Build and publish locally: `sbt clean docker:publishLocal`

## Deploy locally
Expand Down
11 changes: 4 additions & 7 deletions metalus-core/scripts/session_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ CREATE TABLE SESSION_PROCESS
PROCESS_ID INTEGER,
AGENT_ID VARCHAR(64),
HOSTNAME VARCHAR(1024),
EXIT_CODE INTEGER,
EXIT_CODE INTEGER NULL,
START_TIME BIGINT,
END_TIME BIGINT,
DURATION BIGINT,
PRIMARY KEY(SESSION_ID, PROCESS_ID)
END_TIME BIGINT NULL
);

CREATE INDEX sessionProcessIdx ON SESSION_PROCESS (AGENT_ID, SESSION_ID);
Expand All @@ -120,10 +118,9 @@ CREATE TABLE SESSION_PROCESS_HISTORY
PROCESS_ID INTEGER,
AGENT_ID VARCHAR(64),
HOSTNAME VARCHAR(1024),
EXIT_CODE INTEGER,
EXIT_CODE INTEGER NULL,
START_TIME BIGINT,
END_TIME BIGINT,
DURATION BIGINT
END_TIME BIGINT NULL
);

CREATE INDEX sessionProcessHistoryIdx ON SESSION_PROCESS_HISTORY (SESSION_ID);
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package com.acxiom.metalus.utils

import com.acxiom.metalus._
import com.acxiom.metalus.api.HttpRestClient
import com.acxiom.metalus.connectors.{CSVFileDataRowReader, CSVFileDataRowWriter, DataRowReader, DataRowWriter, DataStreamOptions, FileConnector}
import com.acxiom.metalus.connectors.DataStreamOptions
import com.acxiom.metalus.drivers.StreamingDataParser
import com.acxiom.metalus.fs.FileManager
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings, UnescapedQuoteHandling}
import com.univocity.parsers.csv._
import org.slf4j.event.Level

import java.io.OutputStream
Expand Down
Loading

0 comments on commit ac3f787

Please sign in to comment.