Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build metadata endpoint and directory walker #760

Merged
merged 35 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 54 additions & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.expr
import org.apache.thrift.TBase

import java.io.{PrintWriter, StringWriter}
import java.util
import java.util.regex.Pattern
import scala.collection.{Seq, mutable}
import scala.reflect.ClassTag
import scala.util.ScalaJavaConversions.{IteratorOps, ListOps, MapOps}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -634,11 +636,62 @@ object Extensions {
def sanitize: String = Option(string).map(_.replaceAll("[^a-zA-Z0-9_]", "_")).orNull

def cleanSpec: String = string.split("/").head
}

implicit class filePathOps(filePath: String) {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// process chronon configs only. others will be ignored
// todo: add metrics

private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = {
try {
val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true)
Some(configConf)
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}")
None
}
}
private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = {
try {
val configConf = loadJsonToConf[T](file).get
Some(ThriftJsonCodec.toJsonStr(configConf))
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}")
None
}
}
// derive a feature name key from path to file
def confPathToKey: String = {
// capture <conf_type>/<team>/<conf_name> as key e.g joins/team/team.example_join.v1
string.split("/").takeRight(3).mkString("/")
filePath.split("/").takeRight(3).mkString("/")
}

def confPathToTeam: Option[String] = {
try {
filePath match {
case value if value.contains("staging_queries/") => loadJsonToConf[StagingQuery](value).map(_.metaData.team)
case value if value.contains("joins/") => loadJsonToConf[Join](value).map(_.metaData.team)
case value if value.contains("group_bys/") => loadJsonToConf[GroupBy](value).map(_.metaData.team)
case _ => logger.info(s"unknown config type in file $filePath"); None
}
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}")
None
}
}

def confPathToOptConfStr: Option[String] = {
val confJsonOpt = filePath match {
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value)
case value if value.contains("joins/") => loadJson[Join](value)
case value if value.contains("group_bys/") => loadJson[GroupBy](value)
case _ => logger.info(s"unknown config type in file $filePath"); None
}
confJsonOpt
}
}

Expand Down
85 changes: 85 additions & 0 deletions online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ai.chronon.online

import org.slf4j.LoggerFactory
import ai.chronon.online.MetadataEndPoint
import com.google.gson.Gson

import java.io.File
import java.nio.file.{Files, Paths}

class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = {
if (base.isFile) {
Seq(base)
} else {
val files = base.listFiles
val result = files.filter(_.isFile)
result ++
files
.filter(_.isDirectory)
.filter(_ => recursive)
.flatMap(listFiles(_, recursive))
}
}

private def parseName(path: String): Option[String] = {
val gson = new Gson()
val reader = Files.newBufferedReader(Paths.get(path))
try {
val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]])
Option(map.get("metaData"))
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
.map(_.get("name"))
.flatMap(Option(_))
yuli-han marked this conversation as resolved.
Show resolved Hide resolved
.map(_.asInstanceOf[String])
} catch {
case ex: Throwable =>
logger.error(s"Failed to parse Chronon config file at $path as JSON", ex)
ex.printStackTrace()
None
}
}

lazy val fileList: Seq[File] = {
val configFile = new File(dirPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $dirPath")
listFiles(configFile)
}

lazy val nonEmptyFileList: Seq[File] = {
fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
}
}

def extractKVPair(metadataEndPointName: String, filePath: String): (Option[String], Option[String]) = {
val endPoint: MetadataEndPoint = MetadataEndPoint.NameToEndPoint(metadataEndPointName)
val (key, value) = endPoint.extractFn(filePath)
(key, value)
}

def run: Map[String, Map[String, List[String]]] = {
nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) =>
val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map {
metadataEndPointName =>
(metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath))
}
kvPairToEndPoint
.flatMap(kvPair => {
val endPoint = kvPair._1
val (key, value) = kvPair._2
if (value.isDefined && key.isDefined) {
val map = acc.getOrElse(endPoint, Map.empty[String, List[String]])
val list = map.getOrElse(key.get, List.empty[String]) ++ List(value.get)
acc.updated(endPoint, map.updated(key.get, list))
} else acc
})
.toMap
}
}
}
25 changes: 25 additions & 0 deletions online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ai.chronon.online

import ai.chronon.api.Extensions.{filePathOps}
import org.slf4j.LoggerFactory

case class MetadataEndPoint(extractFn: String => (Option[String], Option[String]), name: String)

object MetadataEndPoint {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)

val ConfByKeyEndPoint = new MetadataEndPoint(
extractFn = confPath => (Some(confPath.confPathToKey), confPath.confPathToOptConfStr),
name = "ZIPLINE_METADATA"
)

val NameByTeamEndPoint = new MetadataEndPoint(
extractFn = confPath => (confPath.confPathToTeam, Some(confPath.confPathToKey)),
name = "ZIPLINE_METADATA_BY_TEAM"
)

val NameToEndPoint: Map[String, MetadataEndPoint] = Map(
ConfByKeyEndPoint.name -> ConfByKeyEndPoint,
NameByTeamEndPoint.name -> NameByTeamEndPoint
)
}
51 changes: 30 additions & 21 deletions online/src/main/scala/ai/chronon/online/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package ai.chronon.online

import org.slf4j.LoggerFactory
import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8}
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils}
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils, filePathOps}
import ai.chronon.api._
import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue}
import com.google.gson.{Gson, GsonBuilder}
import org.apache.thrift.TBase

import java.io.File
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
import scala.collection.immutable.SortedMap
import scala.collection.Seq
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -165,12 +164,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
}
.flatMap { file =>
val path = file.getPath
val confJsonOpt = path match {
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value)
case value if value.contains("joins/") => loadJson[Join](value)
case value if value.contains("group_bys/") => loadJson[GroupBy](value)
case _ => logger.info(s"unknown config type in file $path"); None
}
val confJsonOpt = path.confPathToOptConfStr
val key = path.confPathToKey
confJsonOpt.map { conf =>
logger.info(s"""Putting metadata for
Expand All @@ -188,6 +182,34 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
Future.sequence(futures).map(_.flatten)
}

def put(
kVPairs: Map[String, Seq[String]],
datasetName: String = ChrononMetadataKey,
batchSize: Int = CONF_BATCH_SIZE
): Future[Seq[Boolean]] = {
val puts = kVPairs.map {
case (k, v) => {
logger.info(s"""Putting metadata for
|dataset: $datasetName
|key: $k
|conf: $v""".stripMargin)
val kBytes = k.getBytes()
val vBytes =
if (v.isEmpty) Array.emptyByteArray
else if (v.length == 1) v.head.getBytes()
else v.mkString("\n").getBytes()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want this? If it is already bytes, what is the use of \n?

Copy link
Collaborator Author

@yuli-han yuli-han May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@better365 after the change the value could be a list of string. not sure what will be the value in k-v store if apply getBytes on a list instead of a string. I am trying to convert the list to a string first then apply toBytes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. What will be used for the list of the configs? it will be easier to code it out here.

PutRequest(keyBytes = kBytes,
valueBytes = vBytes,
dataset = dataset,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using the end point name as dataset here.

tsMillis = Some(System.currentTimeMillis()))
}
}.toSeq
val putsBatches = puts.grouped(batchSize).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset")
val futures = putsBatches.map(batch => kvStore.multiPut(batch))
Future.sequence(futures).map(_.flatten)
}

// list file recursively
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = {
if (base.isFile) {
Expand All @@ -203,19 +225,6 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
}
}

// process chronon configs only. others will be ignored
// todo: add metrics
private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = {
try {
val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true)
Some(ThriftJsonCodec.toJsonStr(configConf))
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}")
None
}
}

def parseName(path: String): Option[String] = {
val gson = new Gson()
val reader = Files.newBufferedReader(Paths.get(path))
Expand Down
18 changes: 13 additions & 5 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps}
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps, filePathOps}
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.online.{Api, Fetcher, MetadataStore}
import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataStore}
import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob}
import ai.chronon.spark.streaming.{JoinSourceRunner, TopicChecker}
import com.fasterxml.jackson.databind.ObjectMapper
Expand All @@ -41,7 +41,7 @@ import java.io.File
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.io.Source
import scala.reflect.ClassTag
Expand Down Expand Up @@ -732,8 +732,16 @@ object Driver {
}

def run(args: Args): Unit = {
val putRequest = args.metaDataStore.putConf(args.confPath())
val res = Await.result(putRequest, 1.hour)
val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can make this a passed-in config, WDYT? @nikhilsimha

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - we could take this from conf

val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints)
val kvMap = dirWalker.run
implicit val ec: ExecutionContext = ExecutionContext.global
val putRequestsIterable: Iterable[Future[scala.collection.Seq[Boolean]]] = kvMap.map {
case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint)
}
val putRequests: Future[scala.collection.Seq[Boolean]] =
Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq))
val res = Await.result(putRequests, 1.hour)
logger.info(
s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}")
}
Expand Down