Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build metadata endpoint and directory walker #760

Merged
merged 35 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 32 additions & 3 deletions online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.slf4j.LoggerFactory
import ai.chronon.api.{Constants, StructType}
import ai.chronon.online.KVStore.{GetRequest, GetResponse, PutRequest}
import org.apache.spark.sql.SparkSession
import java.util.Base64
import java.nio.charset.StandardCharsets

import java.util.function.Consumer
import scala.collection.Seq
Expand Down Expand Up @@ -56,16 +58,28 @@ trait KVStore {

// helper method to blocking read a string - used for fetching metadata & not in hotpath.
def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = {
val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset)
val responseFutureOpt = get(fetchRequest)
val response = Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))
val response = getResponse(key, dataset, timeoutMillis)
if (response.values.isFailure) {
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
} else {
Success(new String(response.latest.get.bytes, Constants.UTF8))
}
}

def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = {
val response = getResponse(key, dataset, timeoutMillis)
if (response.values.isFailure) {
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
} else {
Success(StringArrayConverter.bytesToStrings(response.latest.get.bytes))
}
}

private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = {
val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset)
val responseFutureOpt = get(fetchRequest)
Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))
}
def get(request: GetRequest): Future[GetResponse] = {
multiGet(Seq(request))
.map(_.head)
Expand All @@ -85,6 +99,21 @@ trait KVStore {
}
}

object StringArrayConverter {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// Method to convert an array of strings to a byte array using Base64 encoding for each element
def stringsToBytes(strings: Seq[String]): Array[Byte] = {
val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8)))
base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8)
}

// Method to convert a byte array back to an array of strings by decoding Base64
def bytesToStrings(bytes: Array[Byte]): Seq[String] = {
val encodedString = new String(bytes, StandardCharsets.UTF_8)
encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8))
}
}

/**
* ==== MUTATION vs. EVENT ====
* Mutation is the general case of an Event
Expand Down
130 changes: 130 additions & 0 deletions online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package ai.chronon.online

import ai.chronon.api.ThriftJsonCodec
import ai.chronon.api
import org.slf4j.LoggerFactory
import com.google.gson.Gson
import org.apache.thrift.TBase

import java.io.File
import java.nio.file.{Files, Paths}
import scala.reflect.ClassTag

class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = {
if (base.isFile) {
Seq(base)
} else {
val files = base.listFiles
val result = files.filter(_.isFile)
result ++
files
.filter(_.isDirectory)
.filter(_ => recursive)
.flatMap(listFiles(_, recursive))
}
}

private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = {
try {
val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true)
Some(configConf)
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}")
None
}
}
private def parseName(path: String): Option[String] = {
val gson = new Gson()
val reader = Files.newBufferedReader(Paths.get(path))
try {
val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]])
Option(map.get("metaData"))
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
.map(_.get("name"))
.flatMap(Option(_))
yuli-han marked this conversation as resolved.
Show resolved Hide resolved
.map(_.asInstanceOf[String])
} catch {
case ex: Throwable =>
logger.error(s"Failed to parse Chronon config file at $path as JSON", ex)
ex.printStackTrace()
None
}
}

lazy val fileList: Seq[File] = {
val configFile = new File(dirPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $dirPath")
listFiles(configFile)
}

lazy val nonEmptyFileList: Seq[File] = {
fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
}
}

/**
* Iterate over the list of files and extract the key value pairs for each file
* @return Map of endpoint -> (Map of key -> List of values)
* e.g. (
* CHRONON_METADATA_BY_TEAM -> (team -> List("join1", "join2")),
* CHRONON_METADATA -> (teams/joins/join1 -> config1)
* )
*/
def run: Map[String, Map[String, List[String]]] = {
nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) =>
// For each end point we apply the extractFn to the file path to extract the key value pair
val filePath = file.getPath
val optConf =
try {
filePath match {
case value if value.contains("joins/") => loadJsonToConf[api.Join](filePath)
case value if value.contains("group_bys/") => loadJsonToConf[api.GroupBy](filePath)
case value if value.contains("staging_queries/") => loadJsonToConf[api.StagingQuery](filePath)
}
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}")
None
}
if (optConf.isDefined) {
val kvPairToEndPoint: List[(String, (String, String))] = metadataEndPointNames.map { endPointName =>
val conf = optConf.get
val kVPair = filePath match {
case value if value.contains("joins/") =>
MetadataEndPoint.getEndPoint[api.Join](endPointName).extractFn(filePath, conf.asInstanceOf[api.Join])
case value if value.contains("group_bys/") =>
MetadataEndPoint
.getEndPoint[api.GroupBy](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.GroupBy])
case value if value.contains("staging_queries/") =>
MetadataEndPoint
.getEndPoint[api.StagingQuery](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.StagingQuery])
}
(endPointName, kVPair)
}

kvPairToEndPoint
.map(kvPair => {
val endPoint = kvPair._1
val (key, value) = kvPair._2
val map = acc.getOrElse(endPoint, Map.empty[String, List[String]])
val list = map.getOrElse(key, List.empty[String]) ++ List(value)
(endPoint, map.updated(key, list))
})
.toMap
} else {
logger.info(s"Skipping invalid file ${file.getPath}")
acc
}
}
}
}
56 changes: 56 additions & 0 deletions online/src/main/scala/ai/chronon/online/MetadataEndPoint.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ai.chronon.online

import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.{GroupBy, Join, StagingQuery, ThriftJsonCodec}
import org.apache.thrift.TBase
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag

case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](
extractFn: (String, Conf) => (String, String),
name: String
)
object MetadataEndPoint {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)

val ConfByKeyEndPointName = "ZIPLINE_METADATA"
val NameByTeamEndPointName = "CHRONON_METADATA_BY_TEAM"

private def parseTeam[Conf <: TBase[_, _]: Manifest: ClassTag](conf: Conf): String = {
conf match {
case join: Join => "joins/" + join.metaData.team
case groupBy: GroupBy => "group_bys/" + groupBy.metaData.team
case stagingQuery: StagingQuery => "staging_queries/" + stagingQuery.metaData.team
case _ =>
logger.error(s"Failed to parse team from $conf")
throw new Exception(s"Failed to parse team from $conf")
}
}

// key: entity path, e.g. joins/team/team.example_join.v1
// value: entity config in json format
private def confByKeyEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] =
new MetadataEndPoint[Conf](
extractFn = (path, conf) => (path.confPathToKey, ThriftJsonCodec.toJsonStr(conf)),
name = ConfByKeyEndPointName
)

// key: entity type + team name, e.g. joins/team
// value: list of entities under the team, e.g. joins/team/team.example_join.v1, joins/team/team.example_join.v2
private def NameByTeamEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] =
new MetadataEndPoint[Conf](
extractFn = (path, conf) => (parseTeam[Conf](conf), path.confPathToKey),
name = NameByTeamEndPointName
)

def getEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](endPointName: String): MetadataEndPoint[Conf] = {
endPointName match {
case ConfByKeyEndPointName => confByKeyEndPoint[Conf]
case NameByTeamEndPointName => NameByTeamEndPoint[Conf]
case _ =>
logger.error(s"Failed to find endpoint for $endPointName")
throw new Exception(s"Failed to find endpoint for $endPointName")
}
}
}
104 changes: 22 additions & 82 deletions online/src/main/scala/ai/chronon/online/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.thrift.TBase

import java.io.File
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
import scala.collection.immutable.SortedMap
import scala.collection.Seq
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -39,7 +38,7 @@ case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])
class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis)
private val CONF_BATCH_SIZE = 50
private val CONF_BATCH_SIZE = 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could make this configurable.
We had some put request batch oversize error from our KV store before. That's why we set it to 50 for the safer side.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it is a configurable parameter in put function. https://github.com/airbnb/chronon/blob/19664365ace064476d49b6830c980612fa2627d5/online/src/main/scala/ai/chronon/online/MetadataStore.scala#L154c I increase this to 100 because 50 doesn't work well for groupBy(there are about 1k groupBy and 200 joins. )


// Note this should match with the format used in the warehouse
def setPartitionMeta(format: String, spanMillis: Long): Unit = {
Expand Down Expand Up @@ -149,88 +148,29 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
},
{ gb => Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb) })

// upload the materialized JSONs to KV store:
// key = <conf_type>/<team>/<conf_name> in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes
def putConf(configPath: String): Future[Seq[Boolean]] = {
val configFile = new File(configPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $configPath")
val fileList = listFiles(configFile)

val puts = fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
def put(
kVPairs: Map[String, Seq[String]],
datasetName: String = ChrononMetadataKey,
batchSize: Int = CONF_BATCH_SIZE
): Future[Seq[Boolean]] = {
val puts = kVPairs.map {
case (k, v) => {
logger.info(s"""Putting metadata for
|dataset: $datasetName
|key: $k
|conf: $v""".stripMargin)
val kBytes = k.getBytes()
// if value is a single string, use it as is, else join the strings into a json list
val vBytes = if (v.size == 1) v.head.getBytes() else StringArrayConverter.stringsToBytes(v)
PutRequest(keyBytes = kBytes,
valueBytes = vBytes,
dataset = datasetName,
tsMillis = Some(System.currentTimeMillis()))
}
.flatMap { file =>
val path = file.getPath
val confJsonOpt = path match {
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value)
case value if value.contains("joins/") => loadJson[Join](value)
case value if value.contains("group_bys/") => loadJson[GroupBy](value)
case _ => logger.info(s"unknown config type in file $path"); None
}
val key = path.confPathToKey
confJsonOpt.map { conf =>
logger.info(s"""Putting metadata for
|key: $key
|conf: $conf""".stripMargin)
PutRequest(keyBytes = key.getBytes(),
valueBytes = conf.getBytes(),
dataset = dataset,
tsMillis = Some(System.currentTimeMillis()))
}
}
val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset")
}.toSeq
val putsBatches = puts.grouped(batchSize).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$datasetName")
val futures = putsBatches.map(batch => kvStore.multiPut(batch))
Future.sequence(futures).map(_.flatten)
}

// list file recursively
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = {
if (base.isFile) {
Seq(base)
} else {
val files = base.listFiles
val result = files.filter(_.isFile)
result ++
files
.filter(_.isDirectory)
.filter(_ => recursive)
.flatMap(listFiles(_, recursive))
}
}

// process chronon configs only. others will be ignored
// todo: add metrics
private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = {
try {
val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true)
Some(ThriftJsonCodec.toJsonStr(configConf))
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}")
None
}
}

def parseName(path: String): Option[String] = {
val gson = new Gson()
val reader = Files.newBufferedReader(Paths.get(path))
try {
val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]])
Option(map.get("metaData"))
.map(_.asInstanceOf[java.util.Map[String, AnyRef]])
.map(_.get("name"))
.flatMap(Option(_))
.map(_.asInstanceOf[String])
} catch {
case ex: Throwable =>
logger.error(s"Failed to parse Chronon config file at $path as JSON", ex)
ex.printStackTrace()
None
}
}
}