Skip to content

Commit

Permalink
#345 Forced all JSON parsing through a single object
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed Jan 18, 2023
1 parent 400cb54 commit 5d0cebb
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 128 deletions.
@@ -1,7 +1,7 @@
package com.acxiom.metalus

import com.acxiom.metalus.applications.Json4sSerializers
import com.acxiom.metalus.context.Json4sContext
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.utils.{ReflectionUtils, ScalaScriptEngine}
import org.apache.log4j.Logger

Expand Down Expand Up @@ -268,16 +268,15 @@ trait PipelineStepMapper {
parameter: Parameter,
pipelineContext: PipelineContext): Option[Any] = {
val workingMap = map.asInstanceOf[Map[String, Any]]
val jsonContext = pipelineContext.contextManager.getContext("json").get.asInstanceOf[Json4sContext]
val paramSerializers = parameter.json4sSerializers
Some(if (parameter.className.isDefined && parameter.className.get.nonEmpty) {
// Skip the embedded variable mapping if this is a step-group pipeline parameter
// TODO [2.0 Review] Pipeline.category has been removed
if (workingMap.getOrElse("category", "pipeline").asInstanceOf[String] == "step-group") {
jsonContext.parseJson(jsonContext.serializeJson(workingMap), parameter.className.get, paramSerializers)
JsonParser.parseJson(JsonParser.serialize(workingMap), parameter.className.get, paramSerializers)
} else {
jsonContext.parseJson(
jsonContext.serializeJson(mapEmbeddedVariables(workingMap, pipelineContext, paramSerializers), paramSerializers),
JsonParser.parseJson(
JsonParser.serialize(mapEmbeddedVariables(workingMap, pipelineContext, paramSerializers), paramSerializers),
parameter.className.get, paramSerializers)
}
} else {
Expand All @@ -296,12 +295,11 @@ trait PipelineStepMapper {
*/
private def handleListParameter(list: List[_], parameter: Parameter, pipelineContext: PipelineContext): Option[Any] = {
val dropNone = pipelineContext.getGlobalAs[Boolean]("dropNoneFromLists").getOrElse(true)
val jsonContext = pipelineContext.contextManager.getContext("json").get.asInstanceOf[Json4sContext]
val paramSerializers = parameter.json4sSerializers
Some(if (parameter.className.isDefined && parameter.className.get.nonEmpty) {
list.map(value =>
jsonContext.parseJson(
jsonContext.serializeJson(mapEmbeddedVariables(value.asInstanceOf[Map[String, Any]],
JsonParser.parseJson(
JsonParser.serialize(mapEmbeddedVariables(value.asInstanceOf[Map[String, Any]],
pipelineContext, paramSerializers)), parameter.className.get, paramSerializers))
} else if (list.nonEmpty && list.head.isInstanceOf[Map[_, _]]) {
list.map(value => {
Expand Down Expand Up @@ -336,14 +334,13 @@ trait PipelineStepMapper {
private[metalus] def mapEmbeddedVariables(classMap: Map[String, Any],
pipelineContext: PipelineContext,
json4sSerializers: Option[Json4sSerializers]): Map[String, Any] = {
val jsonContext = pipelineContext.contextManager.getContext("json").get.asInstanceOf[Json4sContext]
classMap.foldLeft(classMap)((map, entry) => {
entry._2 match {
case s: String if containsSpecialCharacters(s) =>
map + (entry._1 -> getBestValue(s.split("\\|\\|"), Parameter(), pipelineContext))
case m: Map[String, Any] if m.contains("className")=>
map + (entry._1 -> jsonContext.parseJson(
jsonContext.serializeJson(
map + (entry._1 -> JsonParser.parseJson(
JsonParser.serialize(
mapEmbeddedVariables(m("object").asInstanceOf[Map[String, Any]], pipelineContext, json4sSerializers)),
m("className").asInstanceOf[String]))
case m: Map[_, _] =>
Expand Down
Expand Up @@ -5,8 +5,6 @@ import com.acxiom.metalus.context.Json4sContext
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.utils.ReflectionUtils
import org.apache.log4j.Logger
import org.json4s.Formats
import org.json4s.native.Serialization

/**
* Provides a set of utility functions for working with Application metadata
Expand Down Expand Up @@ -37,17 +35,16 @@ object ApplicationUtils {
val validateArgumentTypes = parameters.getOrElse(Map()).getOrElse("validateStepParameterTypes", false).asInstanceOf[Boolean]
// Create the ContextManager
val contextManager = new ContextManager(application.contexts.getOrElse(Map()), parameters.getOrElse(Map()))
val jsonContext = contextManager.getContext("json").asInstanceOf[Option[Json4sContext]].get
implicit val formats: Formats = jsonContext.generateFormats(None)
val tempCtx = PipelineContext(globals, List(), contextManager = contextManager)
val globalStepMapper = generateStepMapper(application.stepMapper, Some(PipelineStepMapper()),
validateArgumentTypes, credentialProvider)
validateArgumentTypes, credentialProvider, tempCtx)
val rootGlobals = globals.getOrElse(Map[String, Any]()) // Create the default globals
val globalListener = generatePipelineListener(application.pipelineListener, Some(pipelineListener),
validateArgumentTypes, credentialProvider)
validateArgumentTypes, credentialProvider, tempCtx)
val globalPipelineParameters = generatePipelineParameters(application.pipelineParameters, Some(List[PipelineParameter]()))
val pipelineManager = generatePipelineManager(application.pipelineManager,
Some(PipelineManager(application.pipelineTemplates)),
validateArgumentTypes, credentialProvider).get
validateArgumentTypes, credentialProvider, tempCtx).get
val initialContext = PipelineContext(Some(rootGlobals), globalPipelineParameters.get, application.stepPackages,
globalStepMapper.get, globalListener, List(), pipelineManager, credentialProvider, contextManager, Map(), None)

Expand Down Expand Up @@ -82,10 +79,11 @@ object ApplicationUtils {
private def generatePipelineManager(pipelineManagerInfo: Option[ClassInfo],
pipelineManager: Option[PipelineManager],
validateArgumentTypes: Boolean,
credentialProvider: Option[CredentialProvider])(implicit formats: Formats): Option[PipelineManager] = {
credentialProvider: Option[CredentialProvider],
pipelineContext: PipelineContext): Option[PipelineManager] = {
if (pipelineManagerInfo.isDefined && pipelineManagerInfo.get.className.isDefined) {
Some(ReflectionUtils.loadClass(pipelineManagerInfo.get.className.getOrElse("com.acxiom.metalus.CachedPipelineManager"),
Some(parseParameters(pipelineManagerInfo.get, credentialProvider)), validateArgumentTypes).asInstanceOf[PipelineManager])
Some(parseParameters(pipelineManagerInfo.get, credentialProvider, pipelineContext)), validateArgumentTypes).asInstanceOf[PipelineManager])
} else {
pipelineManager
}
Expand All @@ -94,10 +92,11 @@ object ApplicationUtils {
private def generatePipelineListener(pipelineListenerInfo: Option[ClassInfo],
pipelineListener: Option[PipelineListener],
validateArgumentTypes: Boolean,
credentialProvider: Option[CredentialProvider])(implicit formats: Formats): Option[PipelineListener] = {
credentialProvider: Option[CredentialProvider],
pipelineContext: PipelineContext): Option[PipelineListener] = {
if (pipelineListenerInfo.isDefined && pipelineListenerInfo.get.className.isDefined) {
Some(ReflectionUtils.loadClass(pipelineListenerInfo.get.className.getOrElse("com.acxiom.metalus.DefaultPipelineListener"),
Some(parseParameters(pipelineListenerInfo.get, credentialProvider)), validateArgumentTypes).asInstanceOf[PipelineListener])
Some(parseParameters(pipelineListenerInfo.get, credentialProvider, pipelineContext)), validateArgumentTypes).asInstanceOf[PipelineListener])
} else {
pipelineListener
}
Expand All @@ -106,10 +105,11 @@ object ApplicationUtils {
private def generateStepMapper(stepMapperInfo: Option[ClassInfo],
stepMapper: Option[PipelineStepMapper],
validateArgumentTypes: Boolean,
credentialProvider: Option[CredentialProvider])(implicit formats: Formats): Option[PipelineStepMapper] = {
credentialProvider: Option[CredentialProvider],
pipelineContext: PipelineContext): Option[PipelineStepMapper] = {
if (stepMapperInfo.isDefined && stepMapperInfo.get.className.isDefined) {
Some(ReflectionUtils.loadClass(stepMapperInfo.get.className.getOrElse("com.acxiom.metalus.DefaultPipelineStepMapper"),
Some(parseParameters(stepMapperInfo.get, credentialProvider)), validateArgumentTypes).asInstanceOf[PipelineStepMapper])
Some(parseParameters(stepMapperInfo.get, credentialProvider, pipelineContext)), validateArgumentTypes).asInstanceOf[PipelineStepMapper])
} else {
stepMapper
}
Expand All @@ -128,9 +128,9 @@ object ApplicationUtils {
rootGlobals: Map[String, Any],
defaultGlobals: Option[Map[String, Any]],
pipelineContext: PipelineContext,
merge: Boolean = false)(implicit formats: Formats): Option[Map[String, Any]] = {
merge: Boolean = false): Option[Map[String, Any]] = {
globals.map { baseGlobals =>
val result = baseGlobals.foldLeft(rootGlobals)((rootMap, entry) => parseValue(rootMap, entry._1, entry._2, Some(pipelineContext)))
val result = baseGlobals.foldLeft(rootGlobals)((rootMap, entry) => parseValue(rootMap, entry._1, entry._2, pipelineContext))
if (merge) {
defaultGlobals.getOrElse(Map[String, Any]()) ++ result
} else {
Expand All @@ -139,9 +139,10 @@ object ApplicationUtils {
}.orElse(defaultGlobals)
}

private def parseParameters(classInfo: ClassInfo, credentialProvider: Option[CredentialProvider])(implicit formats: Formats): Map[String, Any] = {
private def parseParameters(classInfo: ClassInfo, credentialProvider: Option[CredentialProvider], pipelineContext: PipelineContext): Map[String, Any] = {
classInfo.parameters.getOrElse(Map[String, Any]())
.foldLeft(Map[String, Any]("credentialProvider" -> credentialProvider))((rootMap, entry) => parseValue(rootMap, entry._1, entry._2))
.foldLeft(Map[String, Any]("credentialProvider" -> credentialProvider))((rootMap, entry) =>
parseValue(rootMap, entry._1, entry._2, pipelineContext))
}

/**
Expand All @@ -151,36 +152,40 @@ object ApplicationUtils {
* @param key The key to use when adding teh result to the rootMap
* @param value The value to be parsed
* @param ctx The PipelineContext that will provide the mapper
* @param formats Implicit formats used for JSON conversion
* @return A map containing the converted value
*/
def parseValue(rootMap: Map[String, Any], key: String, value: Any, ctx: Option[PipelineContext] = None)(implicit formats: Formats): Map[String, Any] = {
def parseValue(rootMap: Map[String, Any], key: String, value: Any, ctx: PipelineContext): Map[String, Any] = {
val jsonContext = ctx.contextManager.getContext("json").asInstanceOf[Option[Json4sContext]].get
value match {
case map: Map[String, Any] if map.contains("className") =>
val mapEmbedded = map.get("mapEmbeddedVariables").exists(_.toString.toBoolean) && ctx.isDefined
val mapEmbedded = map.get("mapEmbeddedVariables").exists(_.toString.toBoolean)
val finalMap = if (mapEmbedded) {
ctx.get.parameterMapper.mapEmbeddedVariables(map("object").asInstanceOf[Map[String, Any]], ctx.get, None)
ctx.parameterMapper.mapEmbeddedVariables(map("object").asInstanceOf[Map[String, Any]], ctx, None)
} else {
map("object").asInstanceOf[Map[String, Any]]
}
val obj = JsonParser.parseJson(Serialization.write(finalMap), map("className").asInstanceOf[String])
val obj = JsonParser.parseJson(
JsonParser.serialize(finalMap, jsonContext.serializers),
map("className").asInstanceOf[String], jsonContext.serializers)
rootMap + (key -> obj)
case listMap: List[Any] =>
val obj = listMap.map {
case m: Map[String, Any] =>
if (m.contains("className")) {
val mapEmbedded = m.get("mapEmbeddedVariables").exists(_.toString.toBoolean) && ctx.isDefined
val mapEmbedded = m.get("mapEmbeddedVariables").exists(_.toString.toBoolean)
val map = if (m.contains("parameters")) {
m("parameters").asInstanceOf[Map[String, Any]]
} else {
m("object").asInstanceOf[Map[String, Any]]
}
val finalMap = if (mapEmbedded) {
ctx.get.parameterMapper.mapEmbeddedVariables(map, ctx.get, None)
ctx.parameterMapper.mapEmbeddedVariables(map, ctx, None)
} else {
map
}
JsonParser.parseJson(Serialization.write(finalMap), m("className").asInstanceOf[String])
JsonParser.parseJson(
JsonParser.serialize(finalMap, jsonContext.serializers),
m("className").asInstanceOf[String], jsonContext.serializers)
} else {
m
}
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.acxiom.metalus.context

import com.acxiom.metalus.Context
import com.acxiom.metalus.applications.Json4sSerializers
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.parser.JsonParser.StepSerializer
import com.acxiom.metalus.utils.ReflectionUtils
import org.json4s.ext.{EnumNameSerializer, EnumSerializer}
Expand All @@ -17,71 +18,12 @@ import org.json4s.{CustomSerializer, DefaultFormats, Extraction, Formats, FullTy
* @param jsonSerializers Contains ClassInfo objects for custom serializers and enum serializers.
*/
class Json4sContext(jsonSerializers: Option[Map[String, Any]] = None) extends Context {
private val localSerializers = {
val serializers: Option[Json4sSerializers] = {
if (jsonSerializers.isDefined) {
val jsonString = serializeJson(jsonSerializers.get)
Some(parseJson(jsonString, "com.acxiom.metalus.applications.Json4sSerializers").asInstanceOf[Json4sSerializers])
val jsonString = JsonParser.serialize(jsonSerializers.get)
Some(JsonParser.parseJson(jsonString, "com.acxiom.metalus.applications.Json4sSerializers").asInstanceOf[Json4sSerializers])
} else {
None
}
}

/**
* Parse the provided JSON string into an object of the provided class name.
*
* @param json The JSON string to parse.
* @param className The fully qualified name of the class.
* @return An instantiation of the class from the provided JSON.
*/
def parseJson(json: String, className: String, serializers: Option[Json4sSerializers] = None): Any = {
implicit val formats: Formats = generateFormats(serializers)
val clazz = Class.forName(className)
val scalaType = Reflector.scalaTypeOf(clazz)
Extraction.extract(parse(json), scalaType)
}

/**
* Convert the provided obj into a JSON string.
* @param obj The object to convert.
* @return A JSON string representation of the object.
*/
def serializeJson(obj: Any, serializers: Option[Json4sSerializers] = None): String = {
implicit val formats: Formats = generateFormats(serializers)
Serialization.write(obj)
}

def generateFormats(json4sSerializers: Option[Json4sSerializers]): Formats = {
getDefaultSerializers(json4sSerializers).map { j =>
val enumNames = j.enumNameSerializers.map(_.map(ci => new EnumNameSerializer(ReflectionUtils.loadEnumeration(ci.className.getOrElse("")))))
.getOrElse(List())
val enumIds = j.enumIdSerializers.map(_.map(ci => new EnumSerializer(ReflectionUtils.loadEnumeration(ci.className.getOrElse("")))))
.getOrElse(List())
val customSerializers = j.customSerializers.map(_.map { ci =>
ReflectionUtils.loadClass(ci.className.getOrElse(""), ci.parameters).asInstanceOf[CustomSerializer[_]]
}).getOrElse(List())
val baseFormats: Formats = if (j.hintSerializers.isDefined && j.hintSerializers.get.nonEmpty) {
Serialization.formats(FullTypeHints(
j.hintSerializers.map(_.map { hint => Class.forName(hint.className.getOrElse("")) }).get))
} else {
DefaultFormats
}
(customSerializers ++ enumNames ++ enumIds).foldLeft(baseFormats: Formats) { (formats, custom) =>
formats + custom
}
}.getOrElse(DefaultFormats) + new StepSerializer
}

/**
* This method is responsible for ensuring that there is always a set of serializers or None.
* @param serializers The serializerrs to verify.
* @return An option to use when generrating formats
*/
private def getDefaultSerializers(serializers: Option[Json4sSerializers]) =
if (serializers.isDefined) {
serializers
} else if (Option(localSerializers).isDefined) {
localSerializers
} else {
None
}
}
Expand Up @@ -3,7 +3,6 @@ package com.acxiom.metalus.flow
import com.acxiom.metalus._
import com.acxiom.metalus.applications.ApplicationUtils
import com.acxiom.metalus.audits.{AuditType, ExecutionAudit}
import com.acxiom.metalus.context.Json4sContext

import scala.runtime.BoxedUnit

Expand Down Expand Up @@ -60,7 +59,6 @@ case class StepGroupFlow(pipeline: Pipeline,
private def preparePipelineContext(parameterValues: Map[String, Any],
pipelineContext: PipelineContext,
subPipeline: Pipeline): PipelineContext = {
implicit val formats = pipelineContext.contextManager.getContext("json").get.asInstanceOf[Json4sContext].generateFormats(None)
val updates = if (subPipeline.parameters.isDefined &&
subPipeline.parameters.get.inputs.isDefined &&
subPipeline.parameters.get.inputs.get.nonEmpty) {
Expand All @@ -70,11 +68,11 @@ case class StepGroupFlow(pipeline: Pipeline,
if (parameterValues.contains(input.name)) {
val paramVals = parameterValues - input.name
if (input.global) {
(ApplicationUtils.parseValue(tuple._1, input.name, parameterValues(input.name), Some(pipelineContext)),
(ApplicationUtils.parseValue(tuple._1, input.name, parameterValues(input.name), pipelineContext),
tuple._2, paramVals)
} else {
(tuple._1,
tuple._2.copy(parameters = ApplicationUtils.parseValue(tuple._2.parameters, input.name, parameterValues(input.name), Some(pipelineContext))),
tuple._2.copy(parameters = ApplicationUtils.parseValue(tuple._2.parameters, input.name, parameterValues(input.name), pipelineContext)),
paramVals)
}
} else {
Expand Down

0 comments on commit 5d0cebb

Please sign in to comment.