From fe938153364dfb4477fa5fedbe6efb6a9eed2ce9 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 6 May 2024 14:30:46 -0700 Subject: [PATCH 01/33] initial code --- .../scala/ai/chronon/api/Extensions.scala | 55 +++++++++++- .../ai/chronon/online/MetadataDirWalker.scala | 86 +++++++++++++++++++ .../ai/chronon/online/MetadataEndPoint.scala | 25 ++++++ .../ai/chronon/online/MetadataStore.scala | 52 ++++++----- .../main/scala/ai/chronon/spark/Driver.scala | 23 ++--- 5 files changed, 208 insertions(+), 33 deletions(-) create mode 100644 online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala create mode 100644 online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 9f0acac8b..ccc307e1b 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -23,11 +23,13 @@ import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.sql.Column import org.apache.spark.sql.functions.expr +import org.apache.thrift.TBase import java.io.{PrintWriter, StringWriter} import java.util import java.util.regex.Pattern import scala.collection.{Seq, mutable} +import scala.reflect.ClassTag import scala.util.ScalaJavaConversions.{IteratorOps, ListOps, MapOps} import scala.util.{Failure, Success, Try} @@ -634,11 +636,62 @@ object Extensions { def sanitize: String = Option(string).map(_.replaceAll("[^a-zA-Z0-9_]", "_")).orNull def cleanSpec: String = string.split("/").head + } + implicit class filePathOps(filePath: String) { + @transient lazy val logger = LoggerFactory.getLogger(getClass) + // process chronon configs only. others will be ignored + // todo: add metrics + + private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = { + try { + val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) + Some(configConf) + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") + None + } + } + private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = { + try { + val configConf = loadJsonToConf[T](file).get + Some(ThriftJsonCodec.toJsonStr(configConf)) + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") + None + } + } // derive a feature name key from path to file def confPathToKey: String = { // capture // as key e.g joins/team/team.example_join.v1 - string.split("/").takeRight(3).mkString("/") + filePath.split("/").takeRight(3).mkString("/") + } + + def confPathToTeam: Option[String] = { + try { + filePath match { + case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map(_.metaData.team) + case value if value.contains("joins/") => loadJsonToConf[Join](value).map(_.metaData.team) + case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map(_.metaData.team) + case _ => logger.info(s"unknown config type in file $filePath"); None + } + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}") + None + } + } + + def confPathToOptConfStr: Option[String] = { + val confJsonOpt = filePath match { + case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) + case value if value.contains("joins/") => loadJson[Join](value) + case value if value.contains("group_bys/") => loadJson[GroupBy](value) + case _ => logger.info(s"unknown config type in file $filePath"); None + } + confJsonOpt } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala new file mode 100644 index 000000000..7f44248c9 --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -0,0 +1,86 @@ +package ai.chronon.online + +import org.slf4j.LoggerFactory +import ai.chronon.online.MetadataEndPoint +import com.google.gson.Gson + +import java.io.File +import java.nio.file.{Files, Paths} + +class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { + @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { + if (base.isFile) { + Seq(base) + } else { + val files = base.listFiles + val result = files.filter(_.isFile) + result ++ + files + .filter(_.isDirectory) + .filter(_ => recursive) + .flatMap(listFiles(_, recursive)) + } + } + + private def parseName(path: String): Option[String] = { + val gson = new Gson() + val reader = Files.newBufferedReader(Paths.get(path)) + try { + val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) + Option(map.get("metaData")) + .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) + .map(_.get("name")) + .flatMap(Option(_)) + .map(_.asInstanceOf[String]) + } catch { + case ex: Throwable => + logger.error(s"Failed to parse Chronon config file at $path as JSON", ex) + ex.printStackTrace() + None + } + } + + lazy val fileList: Seq[File] = { + val configFile = new File(dirPath) + assert(configFile.exists(), s"$configFile does not exist") + logger.info(s"Uploading Chronon configs from $dirPath") + listFiles(configFile) + } + + lazy val nonEmptyFileList: Seq[File] = { + fileList + .filter { file => + val name = parseName(file.getPath) + if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") + name.isDefined + } + } + + def extractKVPair(metadataEndPointName: String, filePath: String): (Option[String], Option[String]) = { + val endPoint: MetadataEndPoint = MetadataEndPoint.NameToEndPoint(metadataEndPointName) + val (key, value) = endPoint.extractFn(filePath) + (key, value) + } + + def run: Map[String, Map[String, List[String]]] = { + nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { + (acc, file) => + val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map { metadataEndPointName => + (metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath)) + } + kvPairToEndPoint.flatMap( + kvPair => { + val endPoint = kvPair._1 + val (key, value) = kvPair._2 + if (value.isDefined && key.isDefined) { + val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) + val list = map.getOrElse(key.get, List.empty[String]) ++ List(value.get) + acc.updated(endPoint, map.updated(key.get, list)) + } + else acc + } + ).toMap + } + } +} diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala new file mode 100644 index 000000000..6d3b3e04f --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -0,0 +1,25 @@ +package ai.chronon.online + +import ai.chronon.api.Extensions.{filePathOps} +import org.slf4j.LoggerFactory + +case class MetadataEndPoint (extractFn: String => (Option[String], Option[String]), name: String) + +object MetadataEndPoint { + @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + + val ConfByKeyEndPoint = new MetadataEndPoint( + extractFn = confPath => (Some(confPath.confPathToKey), confPath.confPathToOptConfStr), + name = "ZIPLINE_METADATA" + ) + + val NameByTeamEndPoint = new MetadataEndPoint( + extractFn = confPath => (confPath.confPathToTeam, confPath.confPathToOptConfStr), + name = "ZIPLINE_METADATA_BY_TEAM" + ) + + val NameToEndPoint: Map[String, MetadataEndPoint] = Map( + ConfByKeyEndPoint.name -> ConfByKeyEndPoint, + NameByTeamEndPoint.name -> NameByTeamEndPoint + ) +} \ No newline at end of file diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index f89314855..0525113e5 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -18,7 +18,7 @@ package ai.chronon.online import org.slf4j.LoggerFactory import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8} -import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils} +import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils, filePathOps} import ai.chronon.api._ import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue} import com.google.gson.{Gson, GsonBuilder} @@ -26,9 +26,7 @@ import org.apache.thrift.TBase import java.io.File import java.nio.file.{Files, Paths} -import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import scala.collection.immutable.SortedMap -import scala.collection.Seq import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} @@ -165,12 +163,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, } .flatMap { file => val path = file.getPath - val confJsonOpt = path match { - case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) - case value if value.contains("joins/") => loadJson[Join](value) - case value if value.contains("group_bys/") => loadJson[GroupBy](value) - case _ => logger.info(s"unknown config type in file $path"); None - } + val confJsonOpt = path.confPathToOptConfStr val key = path.confPathToKey confJsonOpt.map { conf => logger.info(s"""Putting metadata for @@ -188,6 +181,34 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, Future.sequence(futures).map(_.flatten) } + def put( + kVPairs: Map[String, Seq[String]], + datasetName: String = ChrononMetadataKey, + batchSize: Int = CONF_BATCH_SIZE + ): Future[Seq[Boolean]] = { + val puts = kVPairs.map { + case (k, v) => { + logger.info(s"""Putting metadata for + |dataset: $datasetName + |key: $k + |conf: $v""".stripMargin) + val kBytes = k.getBytes() + val vBytes = + if (v.isEmpty) Array.emptyByteArray + else if (v.length == 1) v.head.getBytes() + else v.mkString("\n").getBytes() + PutRequest(keyBytes = kBytes, + valueBytes = vBytes, + dataset = dataset, + tsMillis = Some(System.currentTimeMillis())) + } + }.toSeq + val putsBatches = puts.grouped(batchSize).toSeq + logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") + val futures = putsBatches.map(batch => kvStore.multiPut(batch)) + Future.sequence(futures).map(_.flatten) + } + // list file recursively private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { if (base.isFile) { @@ -203,19 +224,6 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, } } - // process chronon configs only. others will be ignored - // todo: add metrics - private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = { - try { - val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) - Some(ThriftJsonCodec.toJsonStr(configConf)) - } catch { - case e: Throwable => - logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") - None - } - } - def parseName(path: String): Option[String] = { val gson = new Gson() val reader = Files.newBufferedReader(Paths.get(path)) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index e17536c08..53a139954 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -17,9 +17,9 @@ package ai.chronon.spark import ai.chronon.api -import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} +import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps, filePathOps} import ai.chronon.api.ThriftJsonCodec -import ai.chronon.online.{Api, Fetcher, MetadataStore} +import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataStore} import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob} import ai.chronon.spark.streaming.{JoinSourceRunner, TopicChecker} import com.fasterxml.jackson.databind.ObjectMapper @@ -27,11 +27,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.io.FileUtils import org.apache.spark.SparkFiles import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.sql.streaming.StreamingQueryListener.{ - QueryProgressEvent, - QueryStartedEvent, - QueryTerminatedEvent -} +import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions} import org.apache.thrift.TBase import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} @@ -41,7 +37,7 @@ import java.io.File import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.Await +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.DurationInt import scala.io.Source import scala.reflect.ClassTag @@ -732,8 +728,15 @@ object Driver { } def run(args: Args): Unit = { - val putRequest = args.metaDataStore.putConf(args.confPath()) - val res = Await.result(putRequest, 1.hour) + val acceptedEndPoints = List("ZIPLINE_METADATA") + val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) + val kvMap = dirWalker.run + implicit val ec: ExecutionContext = ExecutionContext.global + val putRequestsIterable: Iterable[Future[Seq[Boolean]]] = kvMap.map { + case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) + } + val putRequests: Future[Seq[Boolean]] = Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) + val res = Await.result(putRequests, 1.hour) logger.info( s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") } From 4957cef3fb25fc0f38b2b05b8071982f3f79b8e2 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 6 May 2024 15:41:14 -0700 Subject: [PATCH 02/33] reformat --- .../scala/ai/chronon/api/Extensions.scala | 6 +++--- .../ai/chronon/online/MetadataDirWalker.scala | 19 +++++++++---------- .../ai/chronon/online/MetadataEndPoint.scala | 4 ++-- .../main/scala/ai/chronon/spark/Driver.scala | 9 +++++++-- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index ccc307e1b..804a50641 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -673,9 +673,9 @@ object Extensions { try { filePath match { case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map(_.metaData.team) - case value if value.contains("joins/") => loadJsonToConf[Join](value).map(_.metaData.team) - case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map(_.metaData.team) - case _ => logger.info(s"unknown config type in file $filePath"); None + case value if value.contains("joins/") => loadJsonToConf[Join](value).map(_.metaData.team) + case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map(_.metaData.team) + case _ => logger.info(s"unknown config type in file $filePath"); None } } catch { case e: Throwable => diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index 7f44248c9..f7f5d4c7d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -64,23 +64,22 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { } def run: Map[String, Map[String, List[String]]] = { - nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { - (acc, file) => - val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map { metadataEndPointName => - (metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath)) + nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) => + val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map { + metadataEndPointName => + (metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath)) } - kvPairToEndPoint.flatMap( - kvPair => { + kvPairToEndPoint + .flatMap(kvPair => { val endPoint = kvPair._1 val (key, value) = kvPair._2 if (value.isDefined && key.isDefined) { val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) val list = map.getOrElse(key.get, List.empty[String]) ++ List(value.get) acc.updated(endPoint, map.updated(key.get, list)) - } - else acc - } - ).toMap + } else acc + }) + .toMap } } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 6d3b3e04f..f740df4b8 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -3,7 +3,7 @@ package ai.chronon.online import ai.chronon.api.Extensions.{filePathOps} import org.slf4j.LoggerFactory -case class MetadataEndPoint (extractFn: String => (Option[String], Option[String]), name: String) +case class MetadataEndPoint(extractFn: String => (Option[String], Option[String]), name: String) object MetadataEndPoint { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) @@ -22,4 +22,4 @@ object MetadataEndPoint { ConfByKeyEndPoint.name -> ConfByKeyEndPoint, NameByTeamEndPoint.name -> NameByTeamEndPoint ) -} \ No newline at end of file +} diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 53a139954..9b675575f 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -27,7 +27,11 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.io.FileUtils import org.apache.spark.SparkFiles import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} +import org.apache.spark.sql.streaming.StreamingQueryListener.{ + QueryProgressEvent, + QueryStartedEvent, + QueryTerminatedEvent +} import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions} import org.apache.thrift.TBase import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} @@ -735,7 +739,8 @@ object Driver { val putRequestsIterable: Iterable[Future[Seq[Boolean]]] = kvMap.map { case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) } - val putRequests: Future[Seq[Boolean]] = Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) + val putRequests: Future[Seq[Boolean]] = + Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) val res = Await.result(putRequests, 1.hour) logger.info( s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") From a68e52d22386fa414c47f9741a73c9a2138273c9 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 6 May 2024 16:51:24 -0700 Subject: [PATCH 03/33] update end point value --- .../src/main/scala/ai/chronon/online/MetadataEndPoint.scala | 2 +- online/src/main/scala/ai/chronon/online/MetadataStore.scala | 1 + spark/src/main/scala/ai/chronon/spark/Driver.scala | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index f740df4b8..4b13ac62f 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -14,7 +14,7 @@ object MetadataEndPoint { ) val NameByTeamEndPoint = new MetadataEndPoint( - extractFn = confPath => (confPath.confPathToTeam, confPath.confPathToOptConfStr), + extractFn = confPath => (confPath.confPathToTeam, Some(confPath.confPathToKey)), name = "ZIPLINE_METADATA_BY_TEAM" ) diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 0525113e5..583d3f1b8 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -27,6 +27,7 @@ import org.apache.thrift.TBase import java.io.File import java.nio.file.{Files, Paths} import scala.collection.immutable.SortedMap +import scala.collection.Seq import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 9b675575f..cdbf40fc7 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -732,14 +732,14 @@ object Driver { } def run(args: Args): Unit = { - val acceptedEndPoints = List("ZIPLINE_METADATA") + val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM") val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) val kvMap = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global - val putRequestsIterable: Iterable[Future[Seq[Boolean]]] = kvMap.map { + val putRequestsIterable: Iterable[Future[scala.collection.Seq[Boolean]]] = kvMap.map { case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) } - val putRequests: Future[Seq[Boolean]] = + val putRequests: Future[scala.collection.Seq[Boolean]] = Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) val res = Await.result(putRequests, 1.hour) logger.info( From 2de6147754f56178f60c25a800ecd78fc3cbd0ff Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 7 May 2024 11:01:53 -0700 Subject: [PATCH 04/33] add entity type to the team key --- api/src/main/scala/ai/chronon/api/Extensions.scala | 9 +++++---- .../main/scala/ai/chronon/online/MetadataEndPoint.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 804a50641..06c474a66 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -669,12 +669,13 @@ object Extensions { filePath.split("/").takeRight(3).mkString("/") } - def confPathToTeam: Option[String] = { + // derive a feature team key from config, ex: joins/team + def confPathToTeamKey: Option[String] = { try { filePath match { - case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map(_.metaData.team) - case value if value.contains("joins/") => loadJsonToConf[Join](value).map(_.metaData.team) - case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map(_.metaData.team) + case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map("staging_queries" + _.metaData.team) + case value if value.contains("joins/") => loadJsonToConf[Join](value).map("joins" + _.metaData.team) + case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map("group_bys" + _.metaData.team) case _ => logger.info(s"unknown config type in file $filePath"); None } } catch { diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 4b13ac62f..684444b0d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -14,7 +14,7 @@ object MetadataEndPoint { ) val NameByTeamEndPoint = new MetadataEndPoint( - extractFn = confPath => (confPath.confPathToTeam, Some(confPath.confPathToKey)), + extractFn = confPath => (confPath.confPathToTeamKey, Some(confPath.confPathToKey)), name = "ZIPLINE_METADATA_BY_TEAM" ) From 1bd55ebe75712bf33ecb7154574db6c8b0b1f0be Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 7 May 2024 11:05:18 -0700 Subject: [PATCH 05/33] add comment --- api/src/main/scala/ai/chronon/api/Extensions.scala | 4 ++-- .../src/main/scala/ai/chronon/online/MetadataEndPoint.scala | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 06c474a66..230b4f04b 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -664,12 +664,12 @@ object Extensions { } } // derive a feature name key from path to file + // capture // as key e.g joins/team/team.example_join.v1 def confPathToKey: String = { - // capture // as key e.g joins/team/team.example_join.v1 filePath.split("/").takeRight(3).mkString("/") } - // derive a feature team key from config, ex: joins/team + // derive a feature team key from config, e.g: joins/team def confPathToTeamKey: Option[String] = { try { filePath match { diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 684444b0d..d41a2181d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -8,11 +8,15 @@ case class MetadataEndPoint(extractFn: String => (Option[String], Option[String] object MetadataEndPoint { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + // key: entity path, e.g. joins/team/team.example_join.v1 + // value: entity config in json format val ConfByKeyEndPoint = new MetadataEndPoint( extractFn = confPath => (Some(confPath.confPathToKey), confPath.confPathToOptConfStr), name = "ZIPLINE_METADATA" ) + // key: entity type + team name, e.g. joins/team + // value: list of entities under the team, e.g. joins/team/team.example_join.v1, joins/team/team.example_join.v2 val NameByTeamEndPoint = new MetadataEndPoint( extractFn = confPath => (confPath.confPathToTeamKey, Some(confPath.confPathToKey)), name = "ZIPLINE_METADATA_BY_TEAM" From b38a8344381beef013e4864d8f32b1ad52972c38 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 7 May 2024 11:09:26 -0700 Subject: [PATCH 06/33] fix bug --- api/src/main/scala/ai/chronon/api/Extensions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 230b4f04b..da1e77508 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -673,9 +673,9 @@ object Extensions { def confPathToTeamKey: Option[String] = { try { filePath match { - case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map("staging_queries" + _.metaData.team) - case value if value.contains("joins/") => loadJsonToConf[Join](value).map("joins" + _.metaData.team) - case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map("group_bys" + _.metaData.team) + case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map("staging_queries/" + _.metaData.team) + case value if value.contains("joins/") => loadJsonToConf[Join](value).map("joins/" + _.metaData.team) + case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map("group_bys/" + _.metaData.team) case _ => logger.info(s"unknown config type in file $filePath"); None } } catch { From b694d7f9946369f1bb62ae94276cc26254ff4d01 Mon Sep 17 00:00:00 2001 From: hanyuli1995 <31667422+hanyuli1995@users.noreply.github.com> Date: Thu, 9 May 2024 09:46:10 -0700 Subject: [PATCH 07/33] Update online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala Co-authored-by: Nikhil Signed-off-by: hanyuli1995 <31667422+hanyuli1995@users.noreply.github.com> --- .../src/main/scala/ai/chronon/online/MetadataDirWalker.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index f7f5d4c7d..e21553b94 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -30,8 +30,7 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) Option(map.get("metaData")) .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) - .map(_.get("name")) - .flatMap(Option(_)) + .flatMap(Option(_.get("name"))) .map(_.asInstanceOf[String]) } catch { case ex: Throwable => From 2b89ea79186556f580c50ce6f67009f826c51bc9 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Thu, 9 May 2024 14:19:10 -0700 Subject: [PATCH 08/33] fix bug --- .../src/main/scala/ai/chronon/online/MetadataDirWalker.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index e21553b94..f7f5d4c7d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -30,7 +30,8 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) Option(map.get("metaData")) .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) - .flatMap(Option(_.get("name"))) + .map(_.get("name")) + .flatMap(Option(_)) .map(_.asInstanceOf[String]) } catch { case ex: Throwable => From b4c636088d0f6e67c034f6e1bbf7538d1c0ffd45 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Fri, 10 May 2024 13:32:28 -0700 Subject: [PATCH 09/33] fix bugs --- .../scala/ai/chronon/api/Extensions.scala | 18 ----- .../ai/chronon/online/MetadataDirWalker.scala | 69 ++++++++++++++----- .../ai/chronon/online/MetadataEndPoint.scala | 57 +++++++++++---- .../ai/chronon/online/MetadataStore.scala | 2 +- .../main/scala/ai/chronon/spark/Driver.scala | 11 ++- 5 files changed, 103 insertions(+), 54 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index da1e77508..d062aa6e4 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -640,8 +640,6 @@ object Extensions { implicit class filePathOps(filePath: String) { @transient lazy val logger = LoggerFactory.getLogger(getClass) - // process chronon configs only. others will be ignored - // todo: add metrics private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = { try { @@ -669,22 +667,6 @@ object Extensions { filePath.split("/").takeRight(3).mkString("/") } - // derive a feature team key from config, e.g: joins/team - def confPathToTeamKey: Option[String] = { - try { - filePath match { - case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map("staging_queries/" + _.metaData.team) - case value if value.contains("joins/") => loadJsonToConf[Join](value).map("joins/" + _.metaData.team) - case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map("group_bys/" + _.metaData.team) - case _ => logger.info(s"unknown config type in file $filePath"); None - } - } catch { - case e: Throwable => - logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}") - None - } - } - def confPathToOptConfStr: Option[String] = { val confJsonOpt = filePath match { case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index f7f5d4c7d..11eb0522e 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -1,13 +1,16 @@ package ai.chronon.online +import ai.chronon.api.ThriftJsonCodec import org.slf4j.LoggerFactory import ai.chronon.online.MetadataEndPoint import com.google.gson.Gson +import org.apache.thrift.TBase import java.io.File import java.nio.file.{Files, Paths} +import scala.reflect.ClassTag -class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { +class MetadataDirWalker[Conf <: TBase[_, _]: Manifest: ClassTag](dirPath: String, metadataEndPointNames: List[String]) { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { if (base.isFile) { @@ -23,6 +26,16 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { } } + private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = { + try { + val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) + Some(configConf) + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") + None + } + } private def parseName(path: String): Option[String] = { val gson = new Gson() val reader = Files.newBufferedReader(Paths.get(path)) @@ -57,29 +70,47 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { } } - def extractKVPair(metadataEndPointName: String, filePath: String): (Option[String], Option[String]) = { - val endPoint: MetadataEndPoint = MetadataEndPoint.NameToEndPoint(metadataEndPointName) - val (key, value) = endPoint.extractFn(filePath) - (key, value) + private def extractkvPairToEndPoint(filePath: String, conf: Conf): List[(String, (String, String))] = { + metadataEndPointNames.map { endPointName => + (endPointName, MetadataEndPoint.getEndPoint(endPointName).extractFn(filePath, conf)) + } } + /** + * Iterate over the list of files and extract the key value pairs for each file + * @return Map of endpoint -> (Map of key -> List of values) + * e.g. ( + * ZIPLINE_METADATA_BY_TEAM -> (team -> List("join1", "join2")), + * ZIPLINE_METADATA -> (teams/joins/join1 -> config1) + * ) + */ def run: Map[String, Map[String, List[String]]] = { nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) => - val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map { - metadataEndPointName => - (metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath)) - } - kvPairToEndPoint - .flatMap(kvPair => { - val endPoint = kvPair._1 - val (key, value) = kvPair._2 - if (value.isDefined && key.isDefined) { + // For each end point we apply the extractFn to the file path to extract the key value pair + val filePath = file.getPath + val optConf = + try { + loadJsonToConf[Conf](filePath) + } catch { + case e: Throwable => + logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}") + None + } + if (optConf.isDefined) { + val kvPairToEndPoint: List[(String, (String, String))] = extractkvPairToEndPoint(filePath, optConf.get) + kvPairToEndPoint + .flatMap(kvPair => { + val endPoint = kvPair._1 + val (key, value) = kvPair._2 val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) - val list = map.getOrElse(key.get, List.empty[String]) ++ List(value.get) - acc.updated(endPoint, map.updated(key.get, list)) - } else acc - }) - .toMap + val list = map.getOrElse(key, List.empty[String]) ++ List(value) + acc.updated(endPoint, map.updated(key, list)) + }) + .toMap + } else { + logger.info(s"Skipping invalid file ${file.getPath}") + acc + } } } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index d41a2181d..771b508de 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -1,29 +1,56 @@ package ai.chronon.online -import ai.chronon.api.Extensions.{filePathOps} +import ai.chronon.api.Extensions.filePathOps +import ai.chronon.api.{GroupBy, Join, StagingQuery, ThriftJsonCodec} +import org.apache.thrift.TBase import org.slf4j.LoggerFactory -case class MetadataEndPoint(extractFn: String => (Option[String], Option[String]), name: String) +import scala.reflect.ClassTag +case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag]( + extractFn: (String, Conf) => (String, String), + name: String +) object MetadataEndPoint { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) + val ConfByKeyEndPointName = "ZIPLINE_METADATA" + val NameByTeamEndPointName = "ZIPLINE_METADATA_BY_TEAM" + + private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = { + conf match { + case join: Join => "joins/" + join.metaData.team + case groupBy: GroupBy => "group_bys/" + groupBy.metaData.team + case stagingQuery: StagingQuery => "staging_queries/" + stagingQuery.metaData.team + case _ => + logger.error(s"Failed to parse team from $conf") + throw new Exception(s"Failed to parse team from $conf") + } + } + // key: entity path, e.g. joins/team/team.example_join.v1 // value: entity config in json format - val ConfByKeyEndPoint = new MetadataEndPoint( - extractFn = confPath => (Some(confPath.confPathToKey), confPath.confPathToOptConfStr), - name = "ZIPLINE_METADATA" - ) + private def confByKeyEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] = + new MetadataEndPoint[Conf]( + extractFn = (path, conf) => (path.confPathToKey, ThriftJsonCodec.toJsonStr(conf)), + name = ConfByKeyEndPointName + ) // key: entity type + team name, e.g. joins/team // value: list of entities under the team, e.g. joins/team/team.example_join.v1, joins/team/team.example_join.v2 - val NameByTeamEndPoint = new MetadataEndPoint( - extractFn = confPath => (confPath.confPathToTeamKey, Some(confPath.confPathToKey)), - name = "ZIPLINE_METADATA_BY_TEAM" - ) - - val NameToEndPoint: Map[String, MetadataEndPoint] = Map( - ConfByKeyEndPoint.name -> ConfByKeyEndPoint, - NameByTeamEndPoint.name -> NameByTeamEndPoint - ) + private def NameByTeamEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] = + new MetadataEndPoint[Conf]( + extractFn = (path, conf) => (parseTeam[Conf](conf), path.confPathToKey), + name = NameByTeamEndPointName + ) + + def getEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](endPointName: String): MetadataEndPoint[Conf] = { + endPointName match { + case ConfByKeyEndPointName => confByKeyEndPoint[Conf] + case NameByTeamEndPointName => NameByTeamEndPoint[Conf] + case _ => + logger.error(s"Failed to find endpoint for $endPointName") + throw new Exception(s"Failed to find endpoint for $endPointName") + } + } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 583d3f1b8..52c389e97 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -38,7 +38,7 @@ case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])]) class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) - private val CONF_BATCH_SIZE = 50 + private val CONF_BATCH_SIZE = 100 // Note this should match with the format used in the warehouse def setPartitionMeta(format: String, spanMillis: Long): Unit = { diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index cdbf40fc7..e31d38fa3 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -729,11 +729,20 @@ object Driver { class Args extends Subcommand("metadata-upload") with OnlineSubcommand { val confPath: ScallopOption[String] = opt[String](required = true, descr = "Path to the Chronon config file or directory") + val `type`: ScallopOption[String] = + choice(Seq("join", "group-by", "staging-query"), descr = "the type of conf to fetch", default = Some("join")) } def run(args: Args): Unit = { val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM") - val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) + val dirWalker = { + args.confPath() match { + case value if value.contains("joins/") => new MetadataDirWalker[api.Join](args.confPath(), acceptedEndPoints) + case value if value.contains("group_bys/") => new MetadataDirWalker[api.GroupBy](args.confPath(), acceptedEndPoints) + case value if value.contains("staging_queries/") => new MetadataDirWalker[api.StagingQuery](args.confPath(), acceptedEndPoints) + } + } + val kvMap = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global val putRequestsIterable: Iterable[Future[scala.collection.Seq[Boolean]]] = kvMap.map { From ed0ae291908f94bfded0d7c872667448faaf779e Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 09:57:28 -0700 Subject: [PATCH 10/33] remove unused code --- spark/src/main/scala/ai/chronon/spark/Driver.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index e31d38fa3..d7cee8939 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -729,8 +729,6 @@ object Driver { class Args extends Subcommand("metadata-upload") with OnlineSubcommand { val confPath: ScallopOption[String] = opt[String](required = true, descr = "Path to the Chronon config file or directory") - val `type`: ScallopOption[String] = - choice(Seq("join", "group-by", "staging-query"), descr = "the type of conf to fetch", default = Some("join")) } def run(args: Args): Unit = { From 3640121ea974c85013b4bafd1ffce5f22a91f635 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 13:16:53 -0700 Subject: [PATCH 11/33] fix bugs --- .../ai/chronon/online/MetadataDirWalker.scala | 35 +++++++++++-------- .../main/scala/ai/chronon/spark/Driver.scala | 19 ++++------ 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index 11eb0522e..d13457bb5 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -1,8 +1,8 @@ package ai.chronon.online import ai.chronon.api.ThriftJsonCodec +import ai.chronon.api import org.slf4j.LoggerFactory -import ai.chronon.online.MetadataEndPoint import com.google.gson.Gson import org.apache.thrift.TBase @@ -10,7 +10,7 @@ import java.io.File import java.nio.file.{Files, Paths} import scala.reflect.ClassTag -class MetadataDirWalker[Conf <: TBase[_, _]: Manifest: ClassTag](dirPath: String, metadataEndPointNames: List[String]) { +class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { if (base.isFile) { @@ -70,12 +70,6 @@ class MetadataDirWalker[Conf <: TBase[_, _]: Manifest: ClassTag](dirPath: String } } - private def extractkvPairToEndPoint(filePath: String, conf: Conf): List[(String, (String, String))] = { - metadataEndPointNames.map { endPointName => - (endPointName, MetadataEndPoint.getEndPoint(endPointName).extractFn(filePath, conf)) - } - } - /** * Iterate over the list of files and extract the key value pairs for each file * @return Map of endpoint -> (Map of key -> List of values) @@ -90,23 +84,36 @@ class MetadataDirWalker[Conf <: TBase[_, _]: Manifest: ClassTag](dirPath: String val filePath = file.getPath val optConf = try { - loadJsonToConf[Conf](filePath) + filePath match { + case value if value.contains("joins/") => loadJsonToConf[api.Join](filePath) + case value if value.contains("group_bys/") => loadJsonToConf[api.GroupBy](filePath) + case value if value.contains("staging_queries/") =>loadJsonToConf[api.StagingQuery](filePath) + } } catch { case e: Throwable => logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}") None } if (optConf.isDefined) { - val kvPairToEndPoint: List[(String, (String, String))] = extractkvPairToEndPoint(filePath, optConf.get) + val kvPairToEndPoint: List[(String, (String, String))] = metadataEndPointNames.map { endPointName => + val conf = optConf.get + val kVPair = filePath match { + case value if value.contains("joins/") => MetadataEndPoint.getEndPoint[api.Join](endPointName).extractFn(filePath, conf.asInstanceOf[api.Join]) + case value if value.contains("group_bys/") => MetadataEndPoint.getEndPoint[api.GroupBy](endPointName).extractFn(filePath, conf.asInstanceOf[api.GroupBy]) + case value if value.contains("staging_queries/") => MetadataEndPoint.getEndPoint[api.StagingQuery](endPointName).extractFn(filePath, conf.asInstanceOf[api.StagingQuery]) + } + (endPointName, kVPair) + } + logger.info(s"Skipping invalid file ${kvPairToEndPoint}") + kvPairToEndPoint - .flatMap(kvPair => { + .map(kvPair => { val endPoint = kvPair._1 val (key, value) = kvPair._2 val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) val list = map.getOrElse(key, List.empty[String]) ++ List(value) - acc.updated(endPoint, map.updated(key, list)) - }) - .toMap + (endPoint, map.updated(key, list)) + }).toMap } else { logger.info(s"Skipping invalid file ${file.getPath}") acc diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index d7cee8939..d5aee7d99 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -733,22 +733,15 @@ object Driver { def run(args: Args): Unit = { val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM") - val dirWalker = { - args.confPath() match { - case value if value.contains("joins/") => new MetadataDirWalker[api.Join](args.confPath(), acceptedEndPoints) - case value if value.contains("group_bys/") => new MetadataDirWalker[api.GroupBy](args.confPath(), acceptedEndPoints) - case value if value.contains("staging_queries/") => new MetadataDirWalker[api.StagingQuery](args.confPath(), acceptedEndPoints) - } - } - - val kvMap = dirWalker.run + val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) + val kvMap: Map[String, Map[String, List[String]]] = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global - val putRequestsIterable: Iterable[Future[scala.collection.Seq[Boolean]]] = kvMap.map { + val putRequestsSeq: Seq[Future[scala.collection.Seq[Boolean]]] = kvMap.toSeq.map { case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) } - val putRequests: Future[scala.collection.Seq[Boolean]] = - Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) - val res = Await.result(putRequests, 1.hour) + val res = putRequestsSeq.flatMap( + putRequests => Await.result(putRequests, 1.hour) + ) logger.info( s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") } From dea0a112e1455fd0a8e2d6790eb15e70ac6c9cd7 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 13:25:06 -0700 Subject: [PATCH 12/33] fix bugs --- .../ai/chronon/online/MetadataStore.scala | 67 ------------------- 1 file changed, 67 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 52c389e97..a82686e94 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -148,40 +148,6 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, }, { gb => Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb) }) - // upload the materialized JSONs to KV store: - // key = // in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes - def putConf(configPath: String): Future[Seq[Boolean]] = { - val configFile = new File(configPath) - assert(configFile.exists(), s"$configFile does not exist") - logger.info(s"Uploading Chronon configs from $configPath") - val fileList = listFiles(configFile) - - val puts = fileList - .filter { file => - val name = parseName(file.getPath) - if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") - name.isDefined - } - .flatMap { file => - val path = file.getPath - val confJsonOpt = path.confPathToOptConfStr - val key = path.confPathToKey - confJsonOpt.map { conf => - logger.info(s"""Putting metadata for - |key: $key - |conf: $conf""".stripMargin) - PutRequest(keyBytes = key.getBytes(), - valueBytes = conf.getBytes(), - dataset = dataset, - tsMillis = Some(System.currentTimeMillis())) - } - } - val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq - logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") - val futures = putsBatches.map(batch => kvStore.multiPut(batch)) - Future.sequence(futures).map(_.flatten) - } - def put( kVPairs: Map[String, Seq[String]], datasetName: String = ChrononMetadataKey, @@ -209,37 +175,4 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, val futures = putsBatches.map(batch => kvStore.multiPut(batch)) Future.sequence(futures).map(_.flatten) } - - // list file recursively - private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { - if (base.isFile) { - Seq(base) - } else { - val files = base.listFiles - val result = files.filter(_.isFile) - result ++ - files - .filter(_.isDirectory) - .filter(_ => recursive) - .flatMap(listFiles(_, recursive)) - } - } - - def parseName(path: String): Option[String] = { - val gson = new Gson() - val reader = Files.newBufferedReader(Paths.get(path)) - try { - val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) - Option(map.get("metaData")) - .map(_.asInstanceOf[java.util.Map[String, AnyRef]]) - .map(_.get("name")) - .flatMap(Option(_)) - .map(_.asInstanceOf[String]) - } catch { - case ex: Throwable => - logger.error(s"Failed to parse Chronon config file at $path as JSON", ex) - ex.printStackTrace() - None - } - } } From bc9055b4fe70d2ca65474487b4540131afb5067c Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 13:26:20 -0700 Subject: [PATCH 13/33] use the real dataset name --- online/src/main/scala/ai/chronon/online/MetadataStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index a82686e94..f8bd00a6d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -166,12 +166,12 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, else v.mkString("\n").getBytes() PutRequest(keyBytes = kBytes, valueBytes = vBytes, - dataset = dataset, + dataset = datasetName, tsMillis = Some(System.currentTimeMillis())) } }.toSeq val putsBatches = puts.grouped(batchSize).toSeq - logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") + logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$datasetName") val futures = putsBatches.map(batch => kvStore.multiPut(batch)) Future.sequence(futures).map(_.flatten) } From 03bbaeddc72b6c208ec05b30895cb92f83c2ad4c Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 13:29:46 -0700 Subject: [PATCH 14/33] remove end points which are not supported for now --- spark/src/main/scala/ai/chronon/spark/Driver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index d5aee7d99..817f3d30d 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -732,7 +732,7 @@ object Driver { } def run(args: Args): Unit = { - val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM") + val acceptedEndPoints = List("ZIPLINE_METADATA") val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) val kvMap: Map[String, Map[String, List[String]]] = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global From ad23ee51b33e286450ee860dda9a299f48349543 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 14:51:57 -0700 Subject: [PATCH 15/33] remove unused change --- .../scala/ai/chronon/api/Extensions.scala | 40 +------------------ 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index d062aa6e4..9f0acac8b 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -23,13 +23,11 @@ import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.sql.Column import org.apache.spark.sql.functions.expr -import org.apache.thrift.TBase import java.io.{PrintWriter, StringWriter} import java.util import java.util.regex.Pattern import scala.collection.{Seq, mutable} -import scala.reflect.ClassTag import scala.util.ScalaJavaConversions.{IteratorOps, ListOps, MapOps} import scala.util.{Failure, Success, Try} @@ -636,45 +634,11 @@ object Extensions { def sanitize: String = Option(string).map(_.replaceAll("[^a-zA-Z0-9_]", "_")).orNull def cleanSpec: String = string.split("/").head - } - - implicit class filePathOps(filePath: String) { - @transient lazy val logger = LoggerFactory.getLogger(getClass) - private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = { - try { - val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) - Some(configConf) - } catch { - case e: Throwable => - logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") - None - } - } - private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = { - try { - val configConf = loadJsonToConf[T](file).get - Some(ThriftJsonCodec.toJsonStr(configConf)) - } catch { - case e: Throwable => - logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") - None - } - } // derive a feature name key from path to file - // capture // as key e.g joins/team/team.example_join.v1 def confPathToKey: String = { - filePath.split("/").takeRight(3).mkString("/") - } - - def confPathToOptConfStr: Option[String] = { - val confJsonOpt = filePath match { - case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) - case value if value.contains("joins/") => loadJson[Join](value) - case value if value.contains("group_bys/") => loadJson[GroupBy](value) - case _ => logger.info(s"unknown config type in file $filePath"); None - } - confJsonOpt + // capture // as key e.g joins/team/team.example_join.v1 + string.split("/").takeRight(3).mkString("/") } } From b26c67f876d8b04d803d3583646532e9578471a8 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 13 May 2024 14:59:05 -0700 Subject: [PATCH 16/33] fix bug --- online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala | 2 +- online/src/main/scala/ai/chronon/online/MetadataStore.scala | 2 +- spark/src/main/scala/ai/chronon/spark/Driver.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 771b508de..5a8365530 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -1,6 +1,6 @@ package ai.chronon.online -import ai.chronon.api.Extensions.filePathOps +import ai.chronon.api.Extensions.StringOps import ai.chronon.api.{GroupBy, Join, StagingQuery, ThriftJsonCodec} import org.apache.thrift.TBase import org.slf4j.LoggerFactory diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index f8bd00a6d..d5c07e9c8 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -18,7 +18,7 @@ package ai.chronon.online import org.slf4j.LoggerFactory import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8} -import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils, filePathOps} +import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils} import ai.chronon.api._ import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue} import com.google.gson.{Gson, GsonBuilder} diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 817f3d30d..10b1bfc09 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -17,7 +17,7 @@ package ai.chronon.spark import ai.chronon.api -import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps, filePathOps} +import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} import ai.chronon.api.ThriftJsonCodec import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataStore} import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob} From b4962bed4b73af614d7cb74a3d57ed38a20c6a94 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 14 May 2024 10:05:19 -0700 Subject: [PATCH 17/33] reformat --- .../ai/chronon/online/MetadataDirWalker.scala | 23 ++++++++++++------- .../main/scala/ai/chronon/spark/Driver.scala | 4 +--- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index d13457bb5..2a0e46e55 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -85,9 +85,9 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { val optConf = try { filePath match { - case value if value.contains("joins/") => loadJsonToConf[api.Join](filePath) - case value if value.contains("group_bys/") => loadJsonToConf[api.GroupBy](filePath) - case value if value.contains("staging_queries/") =>loadJsonToConf[api.StagingQuery](filePath) + case value if value.contains("joins/") => loadJsonToConf[api.Join](filePath) + case value if value.contains("group_bys/") => loadJsonToConf[api.GroupBy](filePath) + case value if value.contains("staging_queries/") => loadJsonToConf[api.StagingQuery](filePath) } } catch { case e: Throwable => @@ -98,13 +98,19 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { val kvPairToEndPoint: List[(String, (String, String))] = metadataEndPointNames.map { endPointName => val conf = optConf.get val kVPair = filePath match { - case value if value.contains("joins/") => MetadataEndPoint.getEndPoint[api.Join](endPointName).extractFn(filePath, conf.asInstanceOf[api.Join]) - case value if value.contains("group_bys/") => MetadataEndPoint.getEndPoint[api.GroupBy](endPointName).extractFn(filePath, conf.asInstanceOf[api.GroupBy]) - case value if value.contains("staging_queries/") => MetadataEndPoint.getEndPoint[api.StagingQuery](endPointName).extractFn(filePath, conf.asInstanceOf[api.StagingQuery]) + case value if value.contains("joins/") => + MetadataEndPoint.getEndPoint[api.Join](endPointName).extractFn(filePath, conf.asInstanceOf[api.Join]) + case value if value.contains("group_bys/") => + MetadataEndPoint + .getEndPoint[api.GroupBy](endPointName) + .extractFn(filePath, conf.asInstanceOf[api.GroupBy]) + case value if value.contains("staging_queries/") => + MetadataEndPoint + .getEndPoint[api.StagingQuery](endPointName) + .extractFn(filePath, conf.asInstanceOf[api.StagingQuery]) } (endPointName, kVPair) } - logger.info(s"Skipping invalid file ${kvPairToEndPoint}") kvPairToEndPoint .map(kvPair => { @@ -113,7 +119,8 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) val list = map.getOrElse(key, List.empty[String]) ++ List(value) (endPoint, map.updated(key, list)) - }).toMap + }) + .toMap } else { logger.info(s"Skipping invalid file ${file.getPath}") acc diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 10b1bfc09..6076dad13 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -739,9 +739,7 @@ object Driver { val putRequestsSeq: Seq[Future[scala.collection.Seq[Boolean]]] = kvMap.toSeq.map { case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) } - val res = putRequestsSeq.flatMap( - putRequests => Await.result(putRequests, 1.hour) - ) + val res = putRequestsSeq.flatMap(putRequests => Await.result(putRequests, 1.hour)) logger.info( s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") } From 19664365ace064476d49b6830c980612fa2627d5 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Wed, 15 May 2024 10:01:29 -0700 Subject: [PATCH 18/33] use chronon instead of zipline --- .../src/main/scala/ai/chronon/online/MetadataDirWalker.scala | 4 ++-- .../src/main/scala/ai/chronon/online/MetadataEndPoint.scala | 4 ++-- online/src/main/scala/ai/chronon/online/MetadataStore.scala | 2 +- spark/src/main/scala/ai/chronon/spark/Driver.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala index 2a0e46e55..102a0eb1f 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala @@ -74,8 +74,8 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { * Iterate over the list of files and extract the key value pairs for each file * @return Map of endpoint -> (Map of key -> List of values) * e.g. ( - * ZIPLINE_METADATA_BY_TEAM -> (team -> List("join1", "join2")), - * ZIPLINE_METADATA -> (teams/joins/join1 -> config1) + * CHRONON_METADATA_BY_TEAM -> (team -> List("join1", "join2")), + * CHRONON_METADATA -> (teams/joins/join1 -> config1) * ) */ def run: Map[String, Map[String, List[String]]] = { diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 5a8365530..894295269 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -14,8 +14,8 @@ case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag]( object MetadataEndPoint { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) - val ConfByKeyEndPointName = "ZIPLINE_METADATA" - val NameByTeamEndPointName = "ZIPLINE_METADATA_BY_TEAM" + val ConfByKeyEndPointName = "CHRONON_METADATA" + val NameByTeamEndPointName = "CHRONON_METADATA_BY_TEAM" private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = { conf match { diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index d5c07e9c8..22abe5059 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -163,7 +163,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, val vBytes = if (v.isEmpty) Array.emptyByteArray else if (v.length == 1) v.head.getBytes() - else v.mkString("\n").getBytes() + else v.mkString(",").getBytes() PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 6076dad13..eb518a047 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -732,7 +732,7 @@ object Driver { } def run(args: Args): Unit = { - val acceptedEndPoints = List("ZIPLINE_METADATA") + val acceptedEndPoints = List("CHRONON_METADATA") val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) val kvMap: Map[String, Map[String, List[String]]] = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global From 8b257e6e2c371c3a3e17737d97b2fae2d58d459b Mon Sep 17 00:00:00 2001 From: yuli_han Date: Wed, 15 May 2024 11:27:54 -0700 Subject: [PATCH 19/33] use constant --- spark/src/main/scala/ai/chronon/spark/Driver.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index eb518a047..bac7c5b1e 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -19,7 +19,7 @@ package ai.chronon.spark import ai.chronon.api import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} import ai.chronon.api.ThriftJsonCodec -import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataStore} +import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataEndPoint, MetadataStore} import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob} import ai.chronon.spark.streaming.{JoinSourceRunner, TopicChecker} import com.fasterxml.jackson.databind.ObjectMapper @@ -27,11 +27,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.io.FileUtils import org.apache.spark.SparkFiles import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.sql.streaming.StreamingQueryListener.{ - QueryProgressEvent, - QueryStartedEvent, - QueryTerminatedEvent -} +import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions} import org.apache.thrift.TBase import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} @@ -732,7 +728,7 @@ object Driver { } def run(args: Args): Unit = { - val acceptedEndPoints = List("CHRONON_METADATA") + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) val kvMap: Map[String, Map[String, List[String]]] = dirWalker.run implicit val ec: ExecutionContext = ExecutionContext.global From a01d799d5c67e0e735682063ed636a358917a91a Mon Sep 17 00:00:00 2001 From: yuli_han Date: Thu, 16 May 2024 15:08:55 -0700 Subject: [PATCH 20/33] revert the name back to the normal name --- online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala index 894295269..68c1eeb3b 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala @@ -14,7 +14,7 @@ case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag]( object MetadataEndPoint { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) - val ConfByKeyEndPointName = "CHRONON_METADATA" + val ConfByKeyEndPointName = "ZIPLINE_METADATA" val NameByTeamEndPointName = "CHRONON_METADATA_BY_TEAM" private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = { From a6b0f91f82b87b8d2ab61d89698fddd79c3dc441 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Fri, 17 May 2024 11:30:27 -0700 Subject: [PATCH 21/33] fix tests --- .../scala/ai/chronon/spark/test/FetcherTest.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 3f10a9ed5..759b497ca 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -21,11 +21,11 @@ import ai.chronon.aggregator.test.Column import ai.chronon.aggregator.windowing.TsUtils import ai.chronon.api import ai.chronon.api.Constants.ChrononMetadataKey -import ai.chronon.api.Extensions.{JoinOps, MetadataOps, DerivationOps} +import ai.chronon.api.Extensions.{DerivationOps, JoinOps, MetadataOps} import ai.chronon.api._ import ai.chronon.online.Fetcher.{Request, Response, StatsRequest} import ai.chronon.online.KVStore.GetRequest -import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataStore, SparkConversions} +import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataDirWalker, MetadataEndPoint, MetadataStore, SparkConversions} import ai.chronon.spark.Extensions._ import ai.chronon.spark.stats.ConsistencyJob import ai.chronon.spark.{Join => _, _} @@ -42,7 +42,7 @@ import java.util.concurrent.Executors import scala.collection.Seq import scala.compat.java8.FutureConverters import scala.concurrent.duration.{Duration, SECONDS} -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.io.Source import scala.util.ScalaJavaConversions._ @@ -74,7 +74,10 @@ class FetcherTest extends TestCase { val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000) inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val singleFilePut = singleFileMetadataStore.putConf(confResource.getPath) + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) + val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) + val singleFileKvMap: Map[String, List[String]] = singleFileDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) + val singleFilePut = singleFileMetadataStore.put(singleFileKvMap, MetadataEndPoint.ConfByKeyEndPointName) Await.result(singleFilePut, Duration.Inf) val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) @@ -86,7 +89,9 @@ class FetcherTest extends TestCase { val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) - val directoryPut = directoryMetadataStore.putConf(confResource.getPath.replace(s"/$joinPath", "")) + val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) + val directoryDataKvMap: Map[String, List[String]] = directoryDataDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) + val directoryPut = singleFileMetadataStore.put(directoryDataKvMap, MetadataEndPoint.ConfByKeyEndPointName) Await.result(directoryPut, Duration.Inf) val dirResponse = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) From 3eab235d9268ddde9151b88229856cdc03948a18 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Fri, 17 May 2024 11:32:11 -0700 Subject: [PATCH 22/33] fix tests --- spark/src/main/scala/ai/chronon/spark/Driver.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index bac7c5b1e..5d11e35bb 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -27,7 +27,11 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.io.FileUtils import org.apache.spark.SparkFiles import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} +import org.apache.spark.sql.streaming.StreamingQueryListener.{ + QueryProgressEvent, + QueryStartedEvent, + QueryTerminatedEvent +} import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions} import org.apache.thrift.TBase import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} From 014be42f8c8b9d017ad25cb88f0ce4dbfdf10909 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 20 May 2024 10:54:34 -0700 Subject: [PATCH 23/33] update values string parsing --- online/src/main/scala/ai/chronon/online/Api.scala | 6 ++++++ online/src/main/scala/ai/chronon/online/MetadataStore.scala | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 55f1c6a96..ad0ccf40b 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -83,6 +83,12 @@ trait KVStore { dataset: String): Array[Byte] = { groupByServingInfo.keyCodec.encode(keys) } + + // Method for taking the sequence of values and constructing the byte array sent to the KVStore + // e.g: [a, b, c] -> ["a", "b", "c"] + def createValueBytes(values: Seq[String]): Array[Byte] = { + values.map(s => s""""$s"""").mkString("[", ", ", "]").getBytes + } } /** diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 22abe5059..1be0c4b6d 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -160,10 +160,11 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, |key: $k |conf: $v""".stripMargin) val kBytes = k.getBytes() + // if value is a single string, use it as is, else join the strings into a json list val vBytes = if (v.isEmpty) Array.emptyByteArray else if (v.length == 1) v.head.getBytes() - else v.mkString(",").getBytes() + else kvStore.createValueBytes(v) PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, From bd3d3dc80aea2bbd14ac43f3ba2e651da52997ef Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 20 May 2024 14:55:06 -0700 Subject: [PATCH 24/33] update the parsing code --- .../src/main/scala/ai/chronon/online/Api.scala | 16 ++++++++++++---- .../scala/ai/chronon/online/MetadataStore.scala | 5 +---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index ad0ccf40b..ded0c0f17 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory import ai.chronon.api.{Constants, StructType} import ai.chronon.online.KVStore.{GetRequest, GetResponse, PutRequest} import org.apache.spark.sql.SparkSession +import java.util.Base64 +import java.nio.charset.StandardCharsets import java.util.function.Consumer import scala.collection.Seq @@ -84,10 +86,16 @@ trait KVStore { groupByServingInfo.keyCodec.encode(keys) } - // Method for taking the sequence of values and constructing the byte array sent to the KVStore - // e.g: [a, b, c] -> ["a", "b", "c"] - def createValueBytes(values: Seq[String]): Array[Byte] = { - values.map(s => s""""$s"""").mkString("[", ", ", "]").getBytes + // Method to convert an array of strings to a byte array using Base64 encoding for each element + def stringsToBytes(strings: Seq[String]): Array[Byte] = { + val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8))) + base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) + } + + // Method to convert a byte array back to an array of strings by decoding Base64 + def bytesToStrings(bytes: Array[Byte]): Seq[String] = { + val encodedString = new String(bytes, StandardCharsets.UTF_8) + encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8)) } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 1be0c4b6d..239ca561c 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -161,10 +161,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, |conf: $v""".stripMargin) val kBytes = k.getBytes() // if value is a single string, use it as is, else join the strings into a json list - val vBytes = - if (v.isEmpty) Array.emptyByteArray - else if (v.length == 1) v.head.getBytes() - else kvStore.createValueBytes(v) + val vBytes = kvStore.stringsToBytes(v) PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, From fe49dfe8cb8ae135cfd87645bdda88affb68597b Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 20 May 2024 14:57:36 -0700 Subject: [PATCH 25/33] fix bug --- spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 759b497ca..0265aa6ee 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -91,7 +91,7 @@ class FetcherTest extends TestCase { inMemoryKvStore.create(directoryDataSetDataSet) val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) val directoryDataKvMap: Map[String, List[String]] = directoryDataDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) - val directoryPut = singleFileMetadataStore.put(directoryDataKvMap, MetadataEndPoint.ConfByKeyEndPointName) + val directoryPut = directoryMetadataStore.put(directoryDataKvMap, MetadataEndPoint.ConfByKeyEndPointName) Await.result(directoryPut, Duration.Inf) val dirResponse = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) From f5de21496f0375ea2c48888e3658782e80f9e605 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 20 May 2024 14:59:10 -0700 Subject: [PATCH 26/33] reformat --- spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 0265aa6ee..4cb475204 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -69,12 +69,12 @@ class FetcherTest extends TestCase { finally src.close() }.replaceAll("\\s+", "") + val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest") val singleFileDataSet = ChrononMetadataKey + "_single_file_test" val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000) inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName) val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) val singleFileKvMap: Map[String, List[String]] = singleFileDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) val singleFilePut = singleFileMetadataStore.put(singleFileKvMap, MetadataEndPoint.ConfByKeyEndPointName) From 24ae2c04a6dbab1af6477a2097bfe00e2072a342 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 20 May 2024 23:31:27 -0700 Subject: [PATCH 27/33] test --- .../scala/ai/chronon/spark/test/FetcherTest.scala | 14 +++++++++----- .../ai/chronon/spark/test/InMemoryKvStore.scala | 10 +++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 4cb475204..c38368c68 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -76,9 +76,13 @@ class FetcherTest extends TestCase { inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) - val singleFileKvMap: Map[String, List[String]] = singleFileDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) - val singleFilePut = singleFileMetadataStore.put(singleFileKvMap, MetadataEndPoint.ConfByKeyEndPointName) - Await.result(singleFilePut, Duration.Inf) + val singleFileKvMap = singleFileDirWalker.run + val vBytes = inMemoryKvStore.stringsToBytes(singleFileKvMap(MetadataEndPoint.ConfByKeyEndPointName).values.head) + //assertEquals(s"[test] ${vBytes}", "test") + val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { + case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, endPoint) + } + singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) @@ -86,7 +90,7 @@ class FetcherTest extends TestCase { assertEquals(expected, actual.replaceAll("\\s+", "")) - val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" + /*val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) @@ -99,7 +103,7 @@ class FetcherTest extends TestCase { assertTrue(dirRes.latest.isSuccess) val dirActual = new String(dirRes.values.get.head.bytes) - assertEquals(expected, dirActual.replaceAll("\\s+", "")) + assertEquals(expected, dirActual.replaceAll("\\s+", ""))*/ val emptyResponse = inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName")) diff --git a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala index 4aa7c9db1..11348cd92 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala @@ -77,7 +77,15 @@ class InMemoryKvStore(tableUtils: () => TableUtils) extends KVStore with Seriali case PutRequest(keyBytes, valueBytes, dataset, millis) => val table = database.get(dataset) val key = encode(keyBytes) - table.compute(key, putFunc(millis.getOrElse(System.currentTimeMillis()) -> valueBytes)) + val m = millis.getOrElse(System.currentTimeMillis()) + logger.info(s"[test] ${valueBytes}") + table.compute( + key, + putFunc( + 1L -> + Array() + ) + ) true } From e24f8466611e078bb655b79341d4c3881b4c0157 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Wed, 22 May 2024 14:02:21 -0700 Subject: [PATCH 28/33] fix tests --- .../main/scala/ai/chronon/online/Api.scala | 2 +- .../ai/chronon/spark/test/FetcherTest.scala | 20 +++++++++---------- .../chronon/spark/test/InMemoryKvStore.scala | 10 +--------- 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index ded0c0f17..7ab520df6 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -64,7 +64,7 @@ trait KVStore { if (response.values.isFailure) { Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) } else { - Success(new String(response.latest.get.bytes, Constants.UTF8)) + Success(bytesToStrings(response.latest.get.bytes).head) } } diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index c38368c68..893de20be 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -77,33 +77,33 @@ class FetcherTest extends TestCase { // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) val singleFileKvMap = singleFileDirWalker.run - val vBytes = inMemoryKvStore.stringsToBytes(singleFileKvMap(MetadataEndPoint.ConfByKeyEndPointName).values.head) - //assertEquals(s"[test] ${vBytes}", "test") val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { - case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, endPoint) + case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, singleFileDataSet) } singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) - val actual = new String(res.values.get.head.bytes) + val actual = inMemoryKvStore.bytesToStrings(res.values.get.head.bytes).head assertEquals(expected, actual.replaceAll("\\s+", "")) - /*val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" + val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) - val directoryDataKvMap: Map[String, List[String]] = directoryDataDirWalker.run(MetadataEndPoint.ConfByKeyEndPointName) - val directoryPut = directoryMetadataStore.put(directoryDataKvMap, MetadataEndPoint.ConfByKeyEndPointName) - Await.result(directoryPut, Duration.Inf) + val directoryDataKvMap = directoryDataDirWalker.run + val directoryPut = directoryDataKvMap.toSeq.map { + case (endPoint, kvMap) => directoryMetadataStore.put(kvMap, directoryDataSetDataSet) + } + directoryPut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val dirResponse = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) val dirRes = Await.result(dirResponse, Duration.Inf) assertTrue(dirRes.latest.isSuccess) - val dirActual = new String(dirRes.values.get.head.bytes) + val dirActual = inMemoryKvStore.bytesToStrings(res.values.get.head.bytes).head - assertEquals(expected, dirActual.replaceAll("\\s+", ""))*/ + assertEquals(expected, dirActual.replaceAll("\\s+", "")) val emptyResponse = inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName")) diff --git a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala index 11348cd92..4aa7c9db1 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala @@ -77,15 +77,7 @@ class InMemoryKvStore(tableUtils: () => TableUtils) extends KVStore with Seriali case PutRequest(keyBytes, valueBytes, dataset, millis) => val table = database.get(dataset) val key = encode(keyBytes) - val m = millis.getOrElse(System.currentTimeMillis()) - logger.info(s"[test] ${valueBytes}") - table.compute( - key, - putFunc( - 1L -> - Array() - ) - ) + table.compute(key, putFunc(millis.getOrElse(System.currentTimeMillis()) -> valueBytes)) true } From 2b8ecc64e838aedcbf1d09d7675b97555c4d6fa5 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 28 May 2024 12:17:46 -0700 Subject: [PATCH 29/33] add code for testing --- online/src/main/scala/ai/chronon/online/Api.scala | 11 +++++++++-- .../main/scala/ai/chronon/online/MetadataStore.scala | 2 +- .../scala/ai/chronon/spark/test/FetcherTest.scala | 6 +++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 7ab520df6..30cef2f75 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -64,7 +64,7 @@ trait KVStore { if (response.values.isFailure) { Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) } else { - Success(bytesToStrings(response.latest.get.bytes).head) + Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes).head) } } @@ -85,16 +85,23 @@ trait KVStore { dataset: String): Array[Byte] = { groupByServingInfo.keyCodec.encode(keys) } +} +object StringArrayConverter { + @transient lazy val logger = LoggerFactory.getLogger(getClass) // Method to convert an array of strings to a byte array using Base64 encoding for each element def stringsToBytes(strings: Seq[String]): Array[Byte] = { val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8))) - base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) + logger.info(s"Encoding: $base64EncodedStrings") + val a = base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) + logger.info(s"put: $a") + a } // Method to convert a byte array back to an array of strings by decoding Base64 def bytesToStrings(bytes: Array[Byte]): Seq[String] = { val encodedString = new String(bytes, StandardCharsets.UTF_8) + logger.info(s"Decoding: $encodedString") encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8)) } } diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index 239ca561c..aa7b80c71 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -161,7 +161,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, |conf: $v""".stripMargin) val kBytes = k.getBytes() // if value is a single string, use it as is, else join the strings into a json list - val vBytes = kvStore.stringsToBytes(v) + val vBytes = StringArrayConverter.stringsToBytes(v) PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index d773abc1f..dec6ecd45 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -25,7 +25,7 @@ import ai.chronon.api.Extensions.{DerivationOps, JoinOps, MetadataOps} import ai.chronon.api._ import ai.chronon.online.Fetcher.{Request, Response, StatsRequest} import ai.chronon.online.KVStore.GetRequest -import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataDirWalker, MetadataEndPoint, MetadataStore, SparkConversions} +import ai.chronon.online.{JavaRequest, LoggableResponseBase64, MetadataDirWalker, MetadataEndPoint, MetadataStore, SparkConversions, StringArrayConverter} import ai.chronon.spark.Extensions._ import ai.chronon.spark.stats.ConsistencyJob import ai.chronon.spark.{Join => _, _} @@ -84,7 +84,7 @@ class FetcherTest extends TestCase { val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) - val actual = inMemoryKvStore.bytesToStrings(res.values.get.head.bytes).head + val actual = StringArrayConverter.bytesToStrings(res.values.get.head.bytes).head assertEquals(expected, actual.replaceAll("\\s+", "")) @@ -101,7 +101,7 @@ class FetcherTest extends TestCase { inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) val dirRes = Await.result(dirResponse, Duration.Inf) assertTrue(dirRes.latest.isSuccess) - val dirActual = inMemoryKvStore.bytesToStrings(res.values.get.head.bytes).head + val dirActual = StringArrayConverter.bytesToStrings(res.values.get.head.bytes).head assertEquals(expected, dirActual.replaceAll("\\s+", "")) From 072a9f4b61cbfdbff664533722304b5a2cec34c3 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 28 May 2024 13:43:19 -0700 Subject: [PATCH 30/33] fix bug --- online/src/main/scala/ai/chronon/online/Api.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 30cef2f75..3f0d79a6d 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -58,6 +58,7 @@ trait KVStore { // helper method to blocking read a string - used for fetching metadata & not in hotpath. def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = { + logger.info(s"[test] key = ${key} dataset = ${dataset}") val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset) val responseFutureOpt = get(fetchRequest) val response = Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS)) From 4990267ea7df69d91fd3c27f1fb0841092b4ccd9 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 28 May 2024 17:19:33 -0700 Subject: [PATCH 31/33] fix bugs --- .../main/scala/ai/chronon/online/Api.scala | 21 ++++++++++++++----- .../ai/chronon/online/MetadataStore.scala | 2 +- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 3f0d79a6d..84f9a39a3 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -58,17 +58,28 @@ trait KVStore { // helper method to blocking read a string - used for fetching metadata & not in hotpath. def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = { - logger.info(s"[test] key = ${key} dataset = ${dataset}") - val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset) - val responseFutureOpt = get(fetchRequest) - val response = Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS)) + val response = getResponse(key, dataset, timeoutMillis) if (response.values.isFailure) { Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) } else { - Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes).head) + Success(new String(response.latest.get.bytes, Constants.UTF8)) } } + def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = { + val response = getResponse(key, dataset, timeoutMillis) + if (response.values.isFailure) { + Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get)) + } else { + Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes)) + } + } + + private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = { + val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset) + val responseFutureOpt = get(fetchRequest) + Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS)) + } def get(request: GetRequest): Future[GetResponse] = { multiGet(Seq(request)) .map(_.head) diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index aa7b80c71..2eb128cfd 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -161,7 +161,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, |conf: $v""".stripMargin) val kBytes = k.getBytes() // if value is a single string, use it as is, else join the strings into a json list - val vBytes = StringArrayConverter.stringsToBytes(v) + val vBytes = if (v.size == 1) v.head.getBytes() else StringArrayConverter.stringsToBytes(v) PutRequest(keyBytes = kBytes, valueBytes = vBytes, dataset = datasetName, From c16a043184c22dd6bf8cb927cf1b9d6e7ad33628 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Tue, 28 May 2024 18:22:53 -0700 Subject: [PATCH 32/33] fix tests --- spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index dec6ecd45..c67bf1670 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -84,7 +84,7 @@ class FetcherTest extends TestCase { val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) - val actual = StringArrayConverter.bytesToStrings(res.values.get.head.bytes).head + val actual = new String(res.values.get.head.bytes) assertEquals(expected, actual.replaceAll("\\s+", "")) @@ -101,7 +101,7 @@ class FetcherTest extends TestCase { inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) val dirRes = Await.result(dirResponse, Duration.Inf) assertTrue(dirRes.latest.isSuccess) - val dirActual = StringArrayConverter.bytesToStrings(res.values.get.head.bytes).head + val dirActual = new String(dirRes.values.get.head.bytes) assertEquals(expected, dirActual.replaceAll("\\s+", "")) From bede60531dfdf753326d3f1bfa12e571854dfb17 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Wed, 29 May 2024 11:27:39 -0700 Subject: [PATCH 33/33] remove test code --- online/src/main/scala/ai/chronon/online/Api.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 84f9a39a3..d01c15caa 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -104,16 +104,12 @@ object StringArrayConverter { // Method to convert an array of strings to a byte array using Base64 encoding for each element def stringsToBytes(strings: Seq[String]): Array[Byte] = { val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8))) - logger.info(s"Encoding: $base64EncodedStrings") - val a = base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) - logger.info(s"put: $a") - a + base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8) } // Method to convert a byte array back to an array of strings by decoding Base64 def bytesToStrings(bytes: Array[Byte]): Seq[String] = { val encodedString = new String(bytes, StandardCharsets.UTF_8) - logger.info(s"Decoding: $encodedString") encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8)) } }