-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Build metadata endpoint and directory walker #760
Changes from 6 commits
fe93815
4957cef
a68e52d
2de6147
1bd55eb
b38a834
b694d7f
2b89ea7
b4c6360
ed0ae29
3640121
dea0a11
bc9055b
03bbaed
ad23ee5
b26c67f
b4962be
1966436
8b257e6
a01d799
a6b0f91
3eab235
014be42
bd3d3dc
fe49dfe
f5de214
24ae2c0
e24f846
fb38f61
2b8ecc6
4e0488a
072a9f4
4990267
c16a043
bede605
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package ai.chronon.online | ||
|
||
import org.slf4j.LoggerFactory | ||
import ai.chronon.online.MetadataEndPoint | ||
import com.google.gson.Gson | ||
|
||
import java.io.File | ||
import java.nio.file.{Files, Paths} | ||
|
||
class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String]) { | ||
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass) | ||
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { | ||
if (base.isFile) { | ||
Seq(base) | ||
} else { | ||
val files = base.listFiles | ||
val result = files.filter(_.isFile) | ||
result ++ | ||
files | ||
.filter(_.isDirectory) | ||
.filter(_ => recursive) | ||
.flatMap(listFiles(_, recursive)) | ||
} | ||
} | ||
|
||
private def parseName(path: String): Option[String] = { | ||
val gson = new Gson() | ||
val reader = Files.newBufferedReader(Paths.get(path)) | ||
try { | ||
val map = gson.fromJson(reader, classOf[java.util.Map[String, AnyRef]]) | ||
Option(map.get("metaData")) | ||
.map(_.asInstanceOf[java.util.Map[String, AnyRef]]) | ||
.map(_.get("name")) | ||
.flatMap(Option(_)) | ||
yuli-han marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.map(_.asInstanceOf[String]) | ||
} catch { | ||
case ex: Throwable => | ||
logger.error(s"Failed to parse Chronon config file at $path as JSON", ex) | ||
ex.printStackTrace() | ||
None | ||
} | ||
} | ||
|
||
lazy val fileList: Seq[File] = { | ||
val configFile = new File(dirPath) | ||
assert(configFile.exists(), s"$configFile does not exist") | ||
logger.info(s"Uploading Chronon configs from $dirPath") | ||
listFiles(configFile) | ||
} | ||
|
||
lazy val nonEmptyFileList: Seq[File] = { | ||
fileList | ||
.filter { file => | ||
val name = parseName(file.getPath) | ||
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") | ||
name.isDefined | ||
} | ||
} | ||
|
||
def extractKVPair(metadataEndPointName: String, filePath: String): (Option[String], Option[String]) = { | ||
val endPoint: MetadataEndPoint = MetadataEndPoint.NameToEndPoint(metadataEndPointName) | ||
val (key, value) = endPoint.extractFn(filePath) | ||
(key, value) | ||
} | ||
|
||
def run: Map[String, Map[String, List[String]]] = { | ||
nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) => | ||
val kvPairToEndPoint: List[(String, (Option[String], Option[String]))] = metadataEndPointNames.map { | ||
metadataEndPointName => | ||
(metadataEndPointName, extractKVPair(metadataEndPointName, file.getPath)) | ||
} | ||
kvPairToEndPoint | ||
.flatMap(kvPair => { | ||
val endPoint = kvPair._1 | ||
val (key, value) = kvPair._2 | ||
if (value.isDefined && key.isDefined) { | ||
val map = acc.getOrElse(endPoint, Map.empty[String, List[String]]) | ||
val list = map.getOrElse(key.get, List.empty[String]) ++ List(value.get) | ||
acc.updated(endPoint, map.updated(key.get, list)) | ||
} else acc | ||
}) | ||
.toMap | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package ai.chronon.online | ||
|
||
import ai.chronon.api.Extensions.{filePathOps} | ||
import org.slf4j.LoggerFactory | ||
|
||
case class MetadataEndPoint(extractFn: String => (Option[String], Option[String]), name: String) | ||
|
||
object MetadataEndPoint { | ||
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass) | ||
|
||
// key: entity path, e.g. joins/team/team.example_join.v1 | ||
// value: entity config in json format | ||
val ConfByKeyEndPoint = new MetadataEndPoint( | ||
extractFn = confPath => (Some(confPath.confPathToKey), confPath.confPathToOptConfStr), | ||
name = "ZIPLINE_METADATA" | ||
) | ||
|
||
// key: entity type + team name, e.g. joins/team | ||
// value: list of entities under the team, e.g. joins/team/team.example_join.v1, joins/team/team.example_join.v2 | ||
val NameByTeamEndPoint = new MetadataEndPoint( | ||
extractFn = confPath => (confPath.confPathToTeamKey, Some(confPath.confPathToKey)), | ||
name = "ZIPLINE_METADATA_BY_TEAM" | ||
) | ||
|
||
val NameToEndPoint: Map[String, MetadataEndPoint] = Map( | ||
ConfByKeyEndPoint.name -> ConfByKeyEndPoint, | ||
NameByTeamEndPoint.name -> NameByTeamEndPoint | ||
) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,14 @@ package ai.chronon.online | |
|
||
import org.slf4j.LoggerFactory | ||
import ai.chronon.api.Constants.{ChrononMetadataKey, UTF8} | ||
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils} | ||
import ai.chronon.api.Extensions.{JoinOps, MetadataOps, StringOps, WindowOps, WindowUtils, filePathOps} | ||
import ai.chronon.api._ | ||
import ai.chronon.online.KVStore.{GetRequest, PutRequest, TimedValue} | ||
import com.google.gson.{Gson, GsonBuilder} | ||
import org.apache.thrift.TBase | ||
|
||
import java.io.File | ||
import java.nio.file.{Files, Paths} | ||
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} | ||
import scala.collection.immutable.SortedMap | ||
import scala.collection.Seq | ||
import scala.concurrent.{ExecutionContext, Future} | ||
|
@@ -165,12 +164,7 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, | |
} | ||
.flatMap { file => | ||
val path = file.getPath | ||
val confJsonOpt = path match { | ||
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) | ||
case value if value.contains("joins/") => loadJson[Join](value) | ||
case value if value.contains("group_bys/") => loadJson[GroupBy](value) | ||
case _ => logger.info(s"unknown config type in file $path"); None | ||
} | ||
val confJsonOpt = path.confPathToOptConfStr | ||
val key = path.confPathToKey | ||
confJsonOpt.map { conf => | ||
logger.info(s"""Putting metadata for | ||
|
@@ -188,6 +182,34 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, | |
Future.sequence(futures).map(_.flatten) | ||
} | ||
|
||
def put( | ||
kVPairs: Map[String, Seq[String]], | ||
datasetName: String = ChrononMetadataKey, | ||
batchSize: Int = CONF_BATCH_SIZE | ||
): Future[Seq[Boolean]] = { | ||
val puts = kVPairs.map { | ||
case (k, v) => { | ||
logger.info(s"""Putting metadata for | ||
|dataset: $datasetName | ||
|key: $k | ||
|conf: $v""".stripMargin) | ||
val kBytes = k.getBytes() | ||
val vBytes = | ||
if (v.isEmpty) Array.emptyByteArray | ||
else if (v.length == 1) v.head.getBytes() | ||
else v.mkString("\n").getBytes() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we want this? If it is already bytes, what is the use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @better365 after the change the value could be a list of string. not sure what will be the value in k-v store if apply getBytes on a list instead of a string. I am trying to convert the list to a string first then apply toBytes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. What will be used for the list of the configs? it will be easier to code it out here. |
||
PutRequest(keyBytes = kBytes, | ||
valueBytes = vBytes, | ||
dataset = dataset, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am using the end point name as dataset here. |
||
tsMillis = Some(System.currentTimeMillis())) | ||
} | ||
}.toSeq | ||
val putsBatches = puts.grouped(batchSize).toSeq | ||
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") | ||
val futures = putsBatches.map(batch => kvStore.multiPut(batch)) | ||
Future.sequence(futures).map(_.flatten) | ||
} | ||
|
||
// list file recursively | ||
private def listFiles(base: File, recursive: Boolean = true): Seq[File] = { | ||
if (base.isFile) { | ||
|
@@ -203,19 +225,6 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, | |
} | ||
} | ||
|
||
// process chronon configs only. others will be ignored | ||
// todo: add metrics | ||
private def loadJson[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[String] = { | ||
try { | ||
val configConf = ThriftJsonCodec.fromJsonFile[T](file, check = true) | ||
Some(ThriftJsonCodec.toJsonStr(configConf)) | ||
} catch { | ||
case e: Throwable => | ||
logger.error(s"Failed to parse compiled Chronon config file: $file, \nerror=${e.getMessage}") | ||
None | ||
} | ||
} | ||
|
||
def parseName(path: String): Option[String] = { | ||
val gson = new Gson() | ||
val reader = Files.newBufferedReader(Paths.get(path)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,9 @@ | |
package ai.chronon.spark | ||
|
||
import ai.chronon.api | ||
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps} | ||
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps, SourceOps, StringOps, filePathOps} | ||
import ai.chronon.api.ThriftJsonCodec | ||
import ai.chronon.online.{Api, Fetcher, MetadataStore} | ||
import ai.chronon.online.{Api, Fetcher, MetadataDirWalker, MetadataStore} | ||
import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob, SummaryJob} | ||
import ai.chronon.spark.streaming.{JoinSourceRunner, TopicChecker} | ||
import com.fasterxml.jackson.databind.ObjectMapper | ||
|
@@ -41,7 +41,7 @@ import java.io.File | |
import java.nio.file.{Files, Paths} | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable | ||
import scala.concurrent.Await | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
import scala.concurrent.duration.DurationInt | ||
import scala.io.Source | ||
import scala.reflect.ClassTag | ||
|
@@ -732,8 +732,16 @@ object Driver { | |
} | ||
|
||
def run(args: Args): Unit = { | ||
val putRequest = args.metaDataStore.putConf(args.confPath()) | ||
val res = Await.result(putRequest, 1.hour) | ||
val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably can make this a passed-in config, WDYT? @nikhilsimha There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed - we could take this from conf |
||
val dirWalker = new MetadataDirWalker(args.confPath(), acceptedEndPoints) | ||
val kvMap = dirWalker.run | ||
implicit val ec: ExecutionContext = ExecutionContext.global | ||
val putRequestsIterable: Iterable[Future[scala.collection.Seq[Boolean]]] = kvMap.map { | ||
case (endPoint, kvMap) => args.metaDataStore.put(kvMap, endPoint) | ||
} | ||
val putRequests: Future[scala.collection.Seq[Boolean]] = | ||
Future.sequence(putRequestsIterable).flatMap(seq => Future.successful(seq.flatten.toSeq)) | ||
val res = Await.result(putRequests, 1.hour) | ||
logger.info( | ||
s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we should make it so that we only parse the file once and run a bunch of extractors for the conf type.
So we need to maintain a list of (key, value) extractors for groupBys, joins and stagingQueries.
We simply parse once, run all the extractors per file, print it to console and write to metastore.