diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 55f1c6a96..d01c15caa 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory import ai.chronon.api.{Constants, StructType} import ai.chronon.online.KVStore.{GetRequest, GetResponse, PutRequest} import org.apache.spark.sql.SparkSession +import java.util.Base64 +import java.nio.charset.StandardCharsets import java.util.function.Consumer import scala.collection.Seq @@ -56,9 +58,7 @@ trait KVStore { // helper method to blocking read a string - used for fetching metadata & not in hotpath. def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = { - val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset) - val responseFutureOpt = get(fetchRequest) - val response = Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS)) + val response = getResponse(key, dataset, timeoutMillis) if (response.values.isFailure) { Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) } else { @@ -66,6 +66,20 @@ trait KVStore { } } + def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = { + val response = getResponse(key, dataset, timeoutMillis) + if (response.values.isFailure) { + Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) + } else { + Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes)) + } + } + + private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = { + val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset) + val responseFutureOpt = get(fetchRequest) + Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS)) + } def get(request: GetRequest): Future[GetResponse] = { multiGet(Seq(request)) .map(_.head) @@ -85,6 +99,21 @@ trait KVStore { } } +object StringArrayConverter { + @transient lazy val logger = LoggerFactory.getLogger(getClass) + // Method to convert an array of strings to a byte array using Base64 encoding for each element + def stringsToBytes(strings: Seq[String]): Array[Byte] = { + val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8))) + base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) + } + + // Method to convert a byte array back to an array of strings by decoding Base64 + def bytesToStrings(bytes: Array[Byte]): Seq[String] = { + val encodedString = new String(bytes, StandardCharsets.UTF_8) + encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8)) + } +} + /** * ==== MUTATION vs. EVENT ==== * Mutation is the general case of an Event diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala new file mode 100644 index 000000000..102a0eb1f --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -0,0 +1,130 @@ +package ai.chronon.online + +import ai.chronon.api.ThriftJsonCodec +import ai.chronon.api +import org.slf4j.LoggerFactory +import com.google.gson.Gson +import org.apache.thrift.TBase + +import java.io.File +import java.nio.file.{Files, Paths} +import scala.reflect.ClassTag + +class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { + @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { + if (base.isFile) { + Seq(base) + } else { + val files = base.listFiles + val result = files.filter(_.isFile) + result ++ + files + .filter(_.isDirectory) + .filter(_ => recursive) + .flatMap(listFiles(_, recursive)) + } + } + + private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = { + try { + val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) + Some(configConf) + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") + None + } + } + private def parseName(path: String): Option[String] = { + val gson = new Gson() + val reader = Files.newBufferedReader(Paths.get(path)) + try { + val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) + Option(map.get("metaData")) + .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) + .map(_.get("name")) + .flatMap(Option(_)) + .map(_.asInstanceOf[String]) + } catch { + case ex: Throwable => + logger.error(s"Failed to parse Chronon config file at $path as JSON", ex) + ex.printStackTrace() + None + } + } + + lazy val fileList: Seq[File] = { + val configFile = new File(dirPath) + assert(configFile.exists(), s"$configFile does not exist") + logger.info(s"Uploading Chronon configs from $dirPath") + listFiles(configFile) + } + + lazy val nonEmptyFileList: Seq[File] = { + fileList + .filter { file => + val name = parseName(file.getPath) + if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") + name.isDefined + } + } + + /** + * Iterate over the list of files and extract the key value pairs for each file + * @return Map of endpoint -> (Map of key -> List of values) + * e.g. ( + * CHRONON_METADATA_BY_TEAM -> (team -> List("join1", "join2")), + * CHRONON_METADATA -> (teams/joins/join1 -> config1) + * ) + */ + def run: Map[String, Map[String, List[String]]] = { + nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) => + // For each end point we apply the extractFn to the file path to extract the key value pair + val filePath = file.getPath + val optConf = + try { + filePath match { + case value if value.contains("joins/") => loadJsonToConf[api.Join](filePath) + case value if value.contains("group_bys/") => loadJsonToConf[api.GroupBy](filePath) + case value if value.contains("staging_queries/") => loadJsonToConf[api.StagingQuery](filePath) + } + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}") + None + } + if (optConf.isDefined) { + val kvPairToEndPoint: List[(String, (String, String))] = metadataEndPointNames.map { endPointName => + val conf = optConf.get + val kVPair = filePath match { + case value if value.contains("joins/") => + MetadataEndPoint.getEndPoint[api.Join](endPointName).extractFn(filePath, conf.asInstanceOf[api.Join]) + case value if value.contains("group_bys/") => + MetadataEndPoint + .getEndPoint[api.GroupBy](endPointName) + .extractFn(filePath, conf.asInstanceOf[api.GroupBy]) + case value if value.contains("staging_queries/") => + MetadataEndPoint + .getEndPoint[api.StagingQuery](endPointName) + .extractFn(filePath, conf.asInstanceOf[api.StagingQuery]) + } + (endPointName, kVPair) + } + + kvPairToEndPoint + .map(kvPair => { + val endPoint = kvPair._1 + val (key, value) = kvPair._2 + val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) + val list = map.getOrElse(key, List.empty[String]) ++ List(value) + (endPoint, map.updated(key, list)) + }) + .toMap + } else { + logger.info(s"Skipping invalid file ${file.getPath}") + acc + } + } + } +} diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala new file mode 100644 index 000000000..68c1eeb3b --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -0,0 +1,56 @@ +package ai.chronon.online + +import ai.chronon.api.Extensions.StringOps +import ai.chronon.api.{GroupBy, Join, StagingQuery, ThriftJsonCodec} +import org.apache.thrift.TBase +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag + +case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag]( + extractFn: (String, Conf) => (String, String), + name: String +) +object MetadataEndPoint { + @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + + val ConfByKeyEndPointName = "ZIPLINE_METADATA" + val NameByTeamEndPointName = "CHRONON_METADATA_BY_TEAM" + + private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = { + conf match { + case join: Join => "joins/" + join.metaData.team + case groupBy: GroupBy => "group_bys/" + groupBy.metaData.team + case stagingQuery: StagingQuery => "staging_queries/" + stagingQuery.metaData.team + case _ => + logger.error(s"Failed to parse team from $conf") + throw new Exception(s"Failed to parse team from $conf") + } + } + + // key: entity path, e.g. joins/team/team.example_join.v1 + // value: entity config in json format + private def confByKeyEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] = + new MetadataEndPoint[Conf]( + extractFn = (path, conf) => (path.confPathToKey, ThriftJsonCodec.toJsonStr(conf)), + name = ConfByKeyEndPointName + ) + + // key: entity type + team name, e.g. joins/team + // value: list of entities under the team, e.g. joins/team/team.example_join.v1, joins/team/team.example_join.v2 + private def NameByTeamEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] = + new MetadataEndPoint[Conf]( + extractFn = (path, conf) => (parseTeam[Conf](conf), path.confPathToKey), + name = NameByTeamEndPointName + ) + + def getEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](endPointName: String): MetadataEndPoint[Conf] = { + endPointName match { + case ConfByKeyEndPointName => confByKeyEndPoint[Conf] + case NameByTeamEndPointName => NameByTeamEndPoint[Conf] + case _ => + logger.error(s"Failed to find endpoint for $endPointName") + throw new Exception(s"Failed to find endpoint for $endPointName") + } + } +} diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index f89314855..2eb128cfd 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -26,7 +26,6 @@ import org.apache.thrift.TBase import java.io.File import java.nio.file.{Files, Paths} -import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import scala.collection.immutable.SortedMap import scala.collection.Seq import scala.concurrent.{ExecutionContext, Future} @@ -39,7 +38,7 @@ case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])]) class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) - private val CONF_BATCH_SIZE = 50 + private val CONF_BATCH_SIZE = 100 // Note this should match with the format used in the warehouse def setPartitionMeta(format: String, spanMillis: Long): Unit = { @@ -149,88 +148,29 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, }, { gb => Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb) }) - // upload the materialized JSONs to KV store: - // key = // in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes - def putConf(configPath: String): Future[Seq[Boolean]] = { - val configFile = new File(configPath) - assert(configFile.exists(), s"$configFile does not exist") - logger.info(s"Uploading Chronon configs from $configPath") - val fileList = listFiles(configFile) - - val puts = fileList - .filter { file => - val name = parseName(file.getPath) - if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") - name.isDefined + def put( + kVPairs: Map[String, Seq[String]], + datasetName: String = ChrononMetadataKey, + batchSize: Int = CONF_BATCH_SIZE + ): Future[Seq[Boolean]] = { + val puts = kVPairs.map { + case (k, v) => { + logger.info(s"""Putting metadata for + |dataset: $datasetName + |key: $k + |conf: $v""".stripMargin) + val kBytes = k.getBytes() + // if value is a single string, use it as is, else join the strings into a json list + val vBytes = if (v.size == 1) v.head.getBytes() else StringArrayConverter.stringsToBytes(v) + PutRequest(keyBytes = kBytes, + valueBytes = vBytes, + dataset = datasetName, + tsMillis = Some(System.currentTimeMillis())) } - .flatMap { file => - val path = file.getPath - val confJsonOpt = path match { - case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) - case value if value.contains("joins/") => loadJson[Join](value) - case value if value.contains("group_bys/") => loadJson[GroupBy](value) - case _ => logger.info(s"unknown config type in file $path"); None - } - val key = path.confPathToKey - confJsonOpt.map { conf => - logger.info(s"""Putting metadata for - |key: $key - |conf: $conf""".stripMargin) - PutRequest(keyBytes = key.getBytes(), - valueBytes = conf.getBytes(), - dataset = dataset, - tsMillis = Some(System.currentTimeMillis())) - } - } - val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq - logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") + }.toSeq + val putsBatches = puts.grouped(batchSize).toSeq + logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$datasetName") val futures = putsBatches.map(batch => kvStore.multiPut(batch)) Future.sequence(futures).map(_.flatten) } - - // list file recursively - private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { - if (base.isFile) { - Seq(base) - } else { - val files = base.listFiles - val result = files.filter(_.isFile) - result ++ - files - .filter(_.isDirectory) - .filter(_ => recursive) - .flatMap(listFiles(_, recursive)) - } - } - - // process chronon configs only. others will be ignored - // todo: add metrics - private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = { - try { - val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) - Some(ThriftJsonCodec.toJsonStr(configConf)) - } catch { - case e: Throwable => - logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") - None - } - } - - def parseName(path: String): Option[String] = { - val gson = new Gson() - val reader = Files.newBufferedReader(Paths.get(path)) - try { - val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) - Option(map.get("metaData")) - .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) - .map(_.get("name")) - .flatMap(Option(_)) - .map(_.asInstanceOf[String]) - } catch { - case ex: Throwable => - logger.error(s"Failed to parse Chronon config file at $path as JSON", ex) - ex.printStackTrace() - None - } - } } diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index e17536c08..5d11e35bb 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -19,7 +19,7 @@ package ai.chronon.spark import ai.chronon.api import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} import ai.chronon.api.ThriftJsonCodec -import ai.chronon.online.{Api, Fetcher, MetadataStore} +import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataEndPoint, MetadataStore} import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob} import ai.chronon.spark.streaming.{JoinSourceRunner, TopicChecker} import com.fasterxml.jackson.databind.ObjectMapper @@ -41,7 +41,7 @@ import java.io.File import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.Await +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.DurationInt import scala.io.Source import scala.reflect.ClassTag @@ -732,8 +732,14 @@ object Driver { } def run(args: Args): Unit = { - val putRequest = args.metaDataStore.putConf(args.confPath()) - val res = Await.result(putRequest, 1.hour) + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) + val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) + val kvMap: Map[String, Map[String, List[String]]] = dirWalker.run + implicit val ec: ExecutionContext = ExecutionContext.global + val putRequestsSeq: Seq[Future[scala.collection.Seq[Boolean]]] = kvMap.toSeq.map { + case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) + } + val res = putRequestsSeq.flatMap(putRequests => Await.result(putRequests, 1.hour)) logger.info( s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") } diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 73770531e..c67bf1670 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -21,11 +21,11 @@ import ai.chronon.aggregator.test.Column import ai.chronon.aggregator.windowing.TsUtils import ai.chronon.api import ai.chronon.api.Constants.ChrononMetadataKey -import ai.chronon.api.Extensions.{JoinOps, MetadataOps, DerivationOps} +import ai.chronon.api.Extensions.{DerivationOps, JoinOps, MetadataOps} import ai.chronon.api._ import ai.chronon.online.Fetcher.{Request, Response, StatsRequest} import ai.chronon.online.KVStore.GetRequest -import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataStore, SparkConversions} +import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataDirWalker, MetadataEndPoint, MetadataStore, SparkConversions, StringArrayConverter} import ai.chronon.spark.Extensions._ import ai.chronon.spark.stats.ConsistencyJob import ai.chronon.spark.{Join => _, _} @@ -42,7 +42,7 @@ import java.util.concurrent.Executors import scala.collection.Seq import scala.compat.java8.FutureConverters import scala.concurrent.duration.{Duration, SECONDS} -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.io.Source import scala.util.ScalaJavaConversions._ @@ -69,13 +69,18 @@ class FetcherTest extends TestCase { finally src.close() }.replaceAll("\\s+", "") + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest") val singleFileDataSet = ChrononMetadataKey + "_single_file_test" val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000) inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val singleFilePut = singleFileMetadataStore.putConf(confResource.getPath) - Await.result(singleFilePut, Duration.Inf) + val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) + val singleFileKvMap = singleFileDirWalker.run + val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { + case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, singleFileDataSet) + } + singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) @@ -86,8 +91,12 @@ class FetcherTest extends TestCase { val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) - val directoryPut = directoryMetadataStore.putConf(confResource.getPath.replace(s"/$joinPath", "")) - Await.result(directoryPut, Duration.Inf) + val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) + val directoryDataKvMap = directoryDataDirWalker.run + val directoryPut = directoryDataKvMap.toSeq.map { + case (endPoint, kvMap) => directoryMetadataStore.put(kvMap, directoryDataSetDataSet) + } + directoryPut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val dirResponse = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) val dirRes = Await.result(dirResponse, Duration.Inf)