Skip to content

Commit

Permalink
Acxiom#345 Updated agent to close jdbc connection cleanly. Updated sl…
Browse files Browse the repository at this point in the history
…f4j to version 1.7.36 to remove conflicts with play and spark. Fixed agent package structure.
  • Loading branch information
dafreels committed Apr 19, 2023
1 parent ac3f787 commit 6dc5aaa
Show file tree
Hide file tree
Showing 20 changed files with 257 additions and 198 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acxiom.controllers
package com.acxiom.metalus.controllers

import play.api.mvc.{BaseController, ControllerComponents}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acxiom.controllers
package com.acxiom.metalus.controllers

import play.api.mvc.{Action, AnyContent, BaseController, ControllerComponents}

Expand Down
9 changes: 9 additions & 0 deletions metalus-agent/app/com/acxiom/metalus/utils/AgentModule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.acxiom.metalus.utils

import com.google.inject.AbstractModule

class AgentModule extends AbstractModule {
override def configure(): Unit = {
bind(classOf[AgentUtils]).asEagerSingleton()
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acxiom.utils
package com.acxiom.metalus.utils

import com.acxiom.metalus.applications.Application
import com.acxiom.metalus.parser.JsonParser
Expand All @@ -15,7 +15,7 @@ import javax.inject.{Inject, Singleton}
import scala.io.Source

@Singleton
class AgentUtils@Inject()(configuration: Configuration, processUtils: ProcessUtils) {
class AgentUtils @Inject()(configuration: Configuration, processUtils: ProcessUtils) {
lazy val AGENT_ID: String = {
val existingId = System.getenv("AGENT_ID")
if (Option(existingId).isDefined) {
Expand All @@ -25,6 +25,19 @@ class AgentUtils@Inject()(configuration: Configuration, processUtils: ProcessUti
}
}

// See if processes need to be recovered
initialize()

private def initialize(): Unit = {
val processes = processUtils.getCurrentProcessInformation(AGENT_ID)
processes.foreach(process => {
processUtils.checkProcessStatus(process.toProcessInfo) match {
case "RECOVER" => processUtils.recoverProcess(process)
case _ =>
}
})
}

/**
* Given an application, this method will create a jar file that can be added to the classpath.
*
Expand Down Expand Up @@ -53,7 +66,6 @@ class AgentUtils@Inject()(configuration: Configuration, processUtils: ProcessUti
* Generates a classpath for the provided jars. This method will attempt to create/access a cache to increase performance.
*
* @param request The application request to use for the jars.
* @param config The system config used for accessing properties.
* @return A classpath for this request.
*/
def generateClassPath(request: ApplicationRequest): String = {
Expand All @@ -80,7 +92,7 @@ class AgentUtils@Inject()(configuration: Configuration, processUtils: ProcessUti
val cacheFile = new File(cacheDir, s"$cacheName.json")
val lockFile = new File(cacheDir, s"$cacheName.lck")
if (lockFile.exists() && !cacheFile.exists()) {
// Wait until lock is removed
// Wait until lock is removed
val retry = RetryPolicy(None, Some(5), Some(false))
// Wait up to 5 minutes for the lock to be released.
val timeout = configuration.get[Int]("api.context.lock.timeout")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acxiom.utils
package com.acxiom.metalus.utils

import com.acxiom.metalus.applications.Application

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.acxiom.utils
package com.acxiom.metalus.utils

import java.util.Date

Expand All @@ -10,4 +10,9 @@ case class SessionProcess(sessionId: String,
hostName: String,
exitCode: Int,
startTime: Date,
endTime: Date)
endTime: Date,
command: List[String]) {
def toProcessInfo: ProcessInfo = {
ProcessInfo(agentId, sessionId, processId, hostName, command)
}
}
199 changes: 199 additions & 0 deletions metalus-agent/app/com/acxiom/metalus/utils/ProcessUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package com.acxiom.metalus.utils

import play.api.Configuration
import play.api.inject.ApplicationLifecycle

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.sql.{DriverManager, ResultSet}
import java.util.{Date, Properties}
import javax.inject.{Inject, Singleton}
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

@Singleton
class ProcessUtils @Inject()(configuration: Configuration, lifecycle: ApplicationLifecycle) {
private lazy val connection = {
val properties = new Properties()
if (configuration.get[String]("api.context.db.user").nonEmpty) {
properties.setProperty("user", configuration.get[String]("api.context.db.user"))
properties.setProperty("password", configuration.get[String]("api.context.db.password"))
}
properties.setProperty("driver", configuration.get[String]("api.context.db.driver"))
val conn = DriverManager.getConnection(configuration.get[String]("api.context.db.url"), properties)
// Ensure connection is closed when server shuts down
lifecycle.addStopHook { () =>
Future.successful(conn.close())
}
conn
}

/**
* Returns limited information about this host.
*
* @return Host information
*/
def hostInfo: HostInfo = {
val osBean = ManagementFactory.getOperatingSystemMXBean
HostInfo(osBean.getAvailableProcessors, osBean.getSystemLoadAverage, osBean.getArch)
}

/**
* Called to execute a command and return the process information.
*
* @param commandList The command to execute.
* @param sessionId The current sessionId.
* @param agentId The current agentId.
* @return True if the process data was stored.
*/
def executeCommand(commandList: List[String], sessionId: String, agentId: String): ProcessInfo = {
val processBuilder = new ProcessBuilder(commandList.asJava)
val process = processBuilder.start()
val pid = process.pid()
val processInfo = ProcessInfo(agentId, sessionId, pid, InetAddress.getLocalHost.getHostName, commandList)
// TODO Register the process with the monitoring task
// Serialize the command for storage
val result = serializeCommand(commandList)
val stmt = connection.prepareStatement(
s"""INSERT INTO SESSION_PROCESS
|VALUES('${processInfo.sessionId}', ${processInfo.processId}, '${processInfo.agentId}',
|'${processInfo.hostName}', NULL, ${System.currentTimeMillis()}, NULL, NULL, ?)""".stripMargin)
stmt.setBlob(1, new ByteArrayInputStream(result))
val response = stmt.executeUpdate() == 1
// TODO Handle the response
stmt.close()
processInfo
}

/**
* Called when a process has completed.
*
* @param processInfo The process information.
* @param exitCode The exit code for the process.
* @return true if the information was updated.
*/
def completeProcess(processInfo: ProcessInfo, exitCode: Int): Boolean = {
val stmt = connection.prepareStatement(
s"""UPDATE SESSION_PROCESS
|SET END_TIME = ${System.currentTimeMillis()},
|EXIT_CODE = $exitCode
|WHERE SESSION_ID = '${processInfo.sessionId}'
|AND PROCESS_ID = ${processInfo.processId}""".stripMargin)
val response = stmt.executeUpdate() == 1
stmt.close()
response
}

def recoverProcess(process: SessionProcess): ProcessInfo = {
val stmt = connection.prepareStatement(
s"""INSERT INTO SESSION_PROCESS_HISTORY
|VALUES('${process.sessionId}', ${process.processId}, '${process.agentId}',
|'${process.hostName}', NULL, ${System.currentTimeMillis()}, NULL, NULL, ?)""".stripMargin)
stmt.setBlob(1, new ByteArrayInputStream(serializeCommand(process.command)))
stmt.executeUpdate() == 1
stmt.close()

val delete = connection.prepareStatement(
s"""DELETE FROM SESSION_PROCESS
|WHERE AGENT_ID = '${process.agentId}' AND PROCESS_ID = ${process.processId}
|AND SESSION_ID = '${process.sessionId}'""".stripMargin)
delete.executeUpdate()
delete.close()

executeCommand(process.command, process.sessionId, process.agentId)
}

/**
* Returns information n any "running" processes.
*
* @param agentId The current agent id.
* @return A list of processes that are believed to be running.
*/
def getCurrentProcessInformation(agentId: String): List[SessionProcess] = {
val stmt = connection.prepareStatement(
s"""SELECT * FROM SESSION_PROCESS
|WHERE AGENT_ID = '$agentId' AND EXIT_CODE IS NULL""".stripMargin)
val rs = stmt.executeQuery()
Iterator.from(0).takeWhile(_ => rs.next()).map(_ => createSessionProcessRecord(rs)).toList
}

/**
* Returns information about the provided processss.
*
* @param processId The process id.
* @param agentId The current agent id.
* @return Information about thee process.
*/
def getProcessInfo(processId: Long, agentId: String): Option[SessionProcess] = {
val stmt = connection.prepareStatement(
s"""SELECT * FROM SESSION_PROCESS
|WHERE AGENT_ID = '$agentId' AND PROCESS_ID = $processId""".stripMargin)
val rs = stmt.executeQuery()
if (rs.next()) {
Some(createSessionProcessRecord(rs))
} else {
None
}
}

/**
* Check the status of the provided process information. One of three states will be returned:
*
* RUNNING - The process is found and considered alive.
* FINISHED - The process is found but is not considered alive.
* RECOVER - The process does not exist on this host and should be recovered.
*
* @param processInfo The tracked process information.
* @return One of three states: RUNNING, FINISHED, RECOVER.
*/
def checkProcessStatus(processInfo: ProcessInfo): String = {
val handle = ProcessHandle.of(processInfo.processId)
if (handle.isPresent) {
if (handle.get().isAlive) {
"RUNNING"
} else {
"FINISHED"
}
} else {
"RECOVER"
}
}

private def createSessionProcessRecord(rs: ResultSet) = {
SessionProcess(rs.getString("SESSION_ID"),
rs.getLong("PROCESS_ID"),
rs.getString("AGENT_ID"),
rs.getString("HOSTNAME"),
rs.getInt("EXIT_CODE"),
new Date(rs.getLong("START_TIME")),
new Date(rs.getLong("START_TIME")),
readBlobData(rs))
}

private def serializeCommand(command: List[String]): Array[Byte] = {
val o = new ByteArrayOutputStream()
val out = new ObjectOutputStream(o)
out.writeObject(command)
out.flush()
val result = o.toByteArray
out.close()
result
}

private def readBlobData(results: ResultSet): List[String] = {
val blob = results.getBlob("STATE")
if (Option(blob).isDefined) {
val obj = blob.getBytes(1, blob.length().toInt)
val input = new ByteArrayInputStream(obj)
val stream = new ObjectInputStream(input)
val newObj = stream.readObject()
stream.close()
newObj.asInstanceOf[List[String]]
} else {
None.orNull
}
}
}

case class HostInfo(cpus: Int, avgLoad: Double, arch: String)
Loading

0 comments on commit 6dc5aaa

Please sign in to comment.