Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata upload to Mussel by team #751

Closed
wants to merge 19 commits into from
127 changes: 90 additions & 37 deletions online/src/main/scala/ai/chronon/online/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,43 +155,10 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
confPath.split("/").takeRight(3).mkString("/")
}

// upload the materialized JSONs to KV store:
// key = <conf_type>/<team>/<conf_name> in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes
def putConf(configPath: String): Future[Seq[Boolean]] = {
val configFile = new File(configPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $configPath")
val fileList = listFiles(configFile)

val puts = fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
}
.flatMap { file =>
val path = file.getPath
val confJsonOpt = path match {
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value)
case value if value.contains("joins/") => loadJson[Join](value)
case value if value.contains("group_bys/") => loadJson[GroupBy](value)
case _ => logger.info(s"unknown config type in file $path"); None
}
val key = pathToKey(path)
confJsonOpt.map { conf =>
logger.info(s"""Putting metadata for
|key: $key
|conf: $conf""".stripMargin)
PutRequest(keyBytes = key.getBytes(),
valueBytes = conf.getBytes(),
dataset = dataset,
tsMillis = Some(System.currentTimeMillis()))
}
}
val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset")
val futures = putsBatches.map(batch => kvStore.multiPut(batch))
Future.sequence(futures).map(_.flatten)
// derive team from path to file
def pathToTeam(confPath: String): String = {
// capture <conf_type>/<team> as key e.g group_bys/host_churn
yuli-han marked this conversation as resolved.
Show resolved Hide resolved
confPath.split("/").takeRight(3).take(2).mkString("/")
}

// list file recursively
Expand Down Expand Up @@ -239,4 +206,90 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey,
None
}
}

// upload the materialized JSONs to KV store:
// key = <conf_type>/<team>/<conf_name> in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes
def putConfByName(configPath: String): Future[Seq[Boolean]] = {
val configFile = new File(configPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $configPath")
val fileList = listFiles(configFile)

val puts: Seq[PutRequest] = fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
}
.flatMap { file =>
val path = file.getPath
val key = pathToKey(path)
val confJsonOpt = path match {
case value if value.contains("staging_queries/") => loadJson[StagingQuery](value)
case value if value.contains("joins/") => loadJson[Join](value)
case value if value.contains("group_bys/") => loadJson[GroupBy](value)
case _ => logger.info(s"unknown config type in file $path"); None
}

confJsonOpt.map { value =>
logger.info(s"""Putting metadata for
|key: $key
|conf: $value""".stripMargin)
PutRequest(keyBytes = key.getBytes(),
valueBytes = value.getBytes(),
dataset = dataset,
tsMillis = Some(System.currentTimeMillis()))
}
}
val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset")
val futures = putsBatches.map(batch => kvStore.multiPut(batch))
Future.sequence(futures).map(_.flatten)
}

// upload the configs list by team to KV store:
// key = <conf_type>/<team> in bytes, e.g group_bys/host_churn
// value = list of config names in bytes, e.g List(group_bys/host_churn/agg_p2_impressions.v1, group_bys/host_churn/agg_deactivations.v1)
def putConfByTeam(configPath: String): Future[Seq[Boolean]] = {

val configFile = new File(configPath)
assert(configFile.exists(), s"$configFile does not exist")
logger.info(s"Uploading Chronon configs from $configPath")
val fileList = listFiles(configFile)
val nameByTeam: Map[String, List[String]] = Map()
val validFileList = fileList
.filter { file =>
val name = parseName(file.getPath)
if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}")
name.isDefined
}

val kvPairs: Map[String, List[String]] = validFileList.foldLeft(Map.empty[String, List[String]]) { (map, file) =>
{
val path = file.getPath
val key = pathToTeam(path)
val value = pathToKey(path)
val updatedList = map.getOrElse(key, List()) :+ value
map + (key -> updatedList)
}
}

val puts: Seq[PutRequest] = kvPairs.map {
case (key, list) => {
val listStr = list.toString()
logger.info(s"""Putting metadata for
|key: $key
|conf: $listStr""".stripMargin)
PutRequest(keyBytes = key.getBytes(),
valueBytes = listStr.getBytes(),
dataset = dataset,
tsMillis = Some(System.currentTimeMillis()))
}
}.toSeq

val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq
logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset")
val futures = putsBatches.map(batch => kvStore.multiPut(batch))
Future.sequence(futures).map(_.flatten)
}
}
10 changes: 7 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,14 @@ object Driver {
}

def run(args: Args): Unit = {
val putRequest = args.metaDataStore.putConf(args.confPath())
val putRequest = args.metaDataStore.putConfByName(args.confPath())
val res = Await.result(putRequest, 1.hour)
logger.info(
s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}")
logger.info(s"Uploaded Chronon ${args.confPath} Configs to the KV store, success count = ${res.count(v =>
v)}, failure count = ${res.count(!_)}")
val putByTeamRequest = args.metaDataStore.putConfByTeam(args.confPath())
val resByTeam = Await.result(putByTeamRequest, 1.hour)
logger.info(s"Uploaded Chronon ${args.confPath} entity by team to the KV store, success count = ${resByTeam.count(
v => v)}, failure count = ${resByTeam.count(!_)}")
}
}

Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class FetcherTest extends TestCase {
val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000)
inMemoryKvStore.create(singleFileDataSet)
// set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing
val singleFilePut = singleFileMetadataStore.putConf(confResource.getPath)
val singleFilePut = singleFileMetadataStore.putConfByName(confResource.getPath)
Await.result(singleFilePut, Duration.Inf)
val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet))
val res = Await.result(response, Duration.Inf)
Expand All @@ -86,7 +86,7 @@ class FetcherTest extends TestCase {
val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test"
val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000)
inMemoryKvStore.create(directoryDataSetDataSet)
val directoryPut = directoryMetadataStore.putConf(confResource.getPath.replace(s"/$joinPath", ""))
val directoryPut = directoryMetadataStore.putConfByName(confResource.getPath.replace(s"/$joinPath", ""))
Await.result(directoryPut, Duration.Inf)
val dirResponse =
inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet))
Expand Down